"""Core HTTP client functionality using HTTPX."""
from contextlib import asynccontextmanager
import httpx
from src.auth.base import ComfyAuth
from src.utils import get_global_logger
from .response import ResponseGetData
logger = get_global_logger("ComfyUI_MCP")
@asynccontextmanager
async def httpx_session_context(
session: httpx.AsyncClient | None = None,
timeout: int = 30,
verify: bool = True,
):
"""Async context manager for HTTPX session lifecycle.
This manages the creation and cleanup of HTTPX async clients, ensuring
proper resource management and connection pooling. If a session is provided,
it is reused (caller is responsible for cleanup). If no session is provided,
a new client is created and automatically closed.
Args:
session: Optional existing HTTPX client to reuse
timeout: Request timeout in seconds (default: 30)
verify: Whether to verify SSL certificates (default: True)
Yields:
httpx.AsyncClient instance
Example:
>>> async with httpx_session_context(timeout=10) as client:
... response = await client.get("http://example.com")
"""
if session is not None:
# Reuse provided session (caller manages lifecycle)
yield session
else:
# Create new session and auto-cleanup
async with httpx.AsyncClient(
timeout=httpx.Timeout(timeout),
verify=verify,
) as client:
yield client
async def get_data(
url: str,
method: str,
auth: ComfyAuth,
body: dict | None = None,
params: dict | None = None,
session: httpx.AsyncClient | None = None,
timeout: int = 30,
headers: dict | None = None,
) -> ResponseGetData:
"""Core HTTP request function with authentication injection.
This is the central function for making HTTP requests to ComfyUI APIs.
It handles:
- Authentication header injection via ComfyAuth
- Session management and connection pooling
- Response standardization
- Error handling and logging
Following the Dependency Inversion Principle, this function depends on
the ComfyAuth abstraction rather than concrete implementations.
Args:
url: Full URL for the request
method: HTTP method (GET, POST, HEAD, DELETE, etc.)
auth: Authentication instance for header injection
body: Optional JSON body for POST/PUT requests
params: Optional query parameters
session: Optional HTTPX client for connection pooling
timeout: Request timeout in seconds (default: 30)
headers: Optional additional headers to merge with auth headers
Returns:
ResponseGetData with status, parsed response, and success flag
Raises:
httpx.HTTPError: For connection errors, timeouts, etc.
Example:
>>> auth = NoAuth("http://127.0.0.1:8188")
>>> res = await get_data(
... url=f"{auth.base_url}/queue",
... method="GET",
... auth=auth,
... )
>>> print(res.status, res.is_success)
200 True
"""
# Build headers with auth injection
request_headers = {
"Content-Type": "application/json",
**auth.auth_header, # Inject authentication headers
**(headers or {}), # Merge additional headers
}
# Use session context for connection pooling
async with httpx_session_context(session=session, timeout=timeout) as client:
# Execute request
response = await client.request(
method=method,
url=url,
headers=request_headers,
json=body, # Automatically serializes to JSON
params=params,
follow_redirects=True,
)
# Parse response body
try:
response_data = response.json() if response.content else None
except Exception:
# Not JSON - check if binary content
content_type = response.headers.get("content-type", "")
if any(
binary_type in content_type
for binary_type in ("image/", "audio/", "video/", "application/octet-stream")
):
response_data = response.content # Return raw bytes for binary content
else:
response_data = response.text if response.text else None
# Build standardized response
return ResponseGetData(
status=response.status_code,
response=response_data,
is_success=response.is_success,
headers=dict(response.headers),
url=str(response.url),
)