"""HTTP session utilities for MCP Atlassian."""
from __future__ import annotations
import logging
from typing import Iterable
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from mcp_atlassian.utils.logging import log_config_param
logger = logging.getLogger("mcp-atlassian")
DEFAULT_CONNECT_TIMEOUT = 10.0
DEFAULT_READ_TIMEOUT = 30.0
DEFAULT_RETRY_MAX = 3
DEFAULT_RETRY_BACKOFF = 0.5
DEFAULT_RETRY_STATUS_CODES = (429, 500, 502, 503, 504)
DEFAULT_RETRY_METHODS = ("HEAD", "GET", "PUT", "DELETE", "OPTIONS")
def _coerce_int(value: object, default: int | None = None) -> int | None:
if isinstance(value, bool):
return default
if isinstance(value, int):
return value
return default
def _coerce_float(value: object, default: float | None = None) -> float | None:
if isinstance(value, bool):
return default
if isinstance(value, int | float):
return float(value)
return default
def _resolve_timeout(
http_timeout: float | None,
connect_timeout: float | None,
read_timeout: float | None,
) -> float | tuple[float, float]:
if connect_timeout is None and read_timeout is None:
if http_timeout is None:
return (DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT)
return http_timeout
resolved_connect = (
connect_timeout if connect_timeout is not None else DEFAULT_CONNECT_TIMEOUT
)
resolved_read = read_timeout if read_timeout is not None else DEFAULT_READ_TIMEOUT
return (resolved_connect, resolved_read)
def _apply_default_timeout(
session: Session, timeout: float | tuple[float, float]
) -> None:
if getattr(session, "_mcp_default_timeout", None) == timeout:
return
original_request = session.request
def request_with_timeout(method: str, url: str, **kwargs):
if kwargs.get("timeout") is None:
kwargs["timeout"] = timeout
return original_request(method, url, **kwargs)
session.request = request_with_timeout # type: ignore[assignment]
session._mcp_default_timeout = timeout # type: ignore[attr-defined]
def configure_http_session(
session: Session,
service_name: str,
http_timeout: float | None = None,
http_connect_timeout: float | None = None,
http_read_timeout: float | None = None,
retry_max: int | None = None,
retry_backoff: float | None = None,
retry_statuses: Iterable[int] | None = None,
retry_methods: Iterable[str] | None = None,
pool_connections: int | None = None,
pool_maxsize: int | None = None,
) -> None:
"""Configure timeouts, retries, and pooling for a requests session."""
resolved_http_timeout = _coerce_float(http_timeout)
resolved_connect_timeout = _coerce_float(http_connect_timeout)
resolved_read_timeout = _coerce_float(http_read_timeout)
timeout = _resolve_timeout(
resolved_http_timeout, resolved_connect_timeout, resolved_read_timeout
)
_apply_default_timeout(session, timeout)
resolved_retry_max = _coerce_int(retry_max, DEFAULT_RETRY_MAX)
if resolved_retry_max is None:
resolved_retry_max = DEFAULT_RETRY_MAX
resolved_retry_max = max(resolved_retry_max, 0)
resolved_retry_backoff = _coerce_float(retry_backoff, DEFAULT_RETRY_BACKOFF)
if resolved_retry_backoff is None:
resolved_retry_backoff = DEFAULT_RETRY_BACKOFF
resolved_retry_backoff = max(resolved_retry_backoff, 0.0)
resolved_retry_statuses = (
list(retry_statuses)
if isinstance(retry_statuses, (list, tuple, set))
else list(DEFAULT_RETRY_STATUS_CODES)
)
resolved_retry_methods = (
{method.strip().upper() for method in retry_methods}
if isinstance(retry_methods, (list, tuple, set))
else set(DEFAULT_RETRY_METHODS)
)
resolved_pool_connections = _coerce_int(pool_connections, 10)
if resolved_pool_connections is None:
resolved_pool_connections = 10
resolved_pool_maxsize = _coerce_int(pool_maxsize, 10)
if resolved_pool_maxsize is None:
resolved_pool_maxsize = 10
if resolved_retry_max > 0:
retry = Retry(
total=resolved_retry_max,
connect=resolved_retry_max,
read=resolved_retry_max,
status=resolved_retry_max,
backoff_factor=resolved_retry_backoff,
status_forcelist=resolved_retry_statuses,
allowed_methods=resolved_retry_methods,
respect_retry_after_header=True,
raise_on_status=False,
)
adapter = HTTPAdapter(
max_retries=retry,
pool_connections=resolved_pool_connections,
pool_maxsize=resolved_pool_maxsize,
)
else:
adapter = HTTPAdapter(
max_retries=0,
pool_connections=resolved_pool_connections,
pool_maxsize=resolved_pool_maxsize,
)
session.mount("https://", adapter)
session.mount("http://", adapter)
if logger.isEnabledFor(logging.INFO):
log_config_param(logger, service_name, "HTTP_TIMEOUT", str(timeout))
log_config_param(logger, service_name, "RETRY_MAX", str(resolved_retry_max))
log_config_param(
logger, service_name, "RETRY_BACKOFF", str(resolved_retry_backoff)
)
log_config_param(
logger, service_name, "RETRY_STATUS_CODES", str(resolved_retry_statuses)
)
log_config_param(
logger, service_name, "RETRY_METHODS", ",".join(sorted(resolved_retry_methods))
)
log_config_param(
logger, service_name, "POOL_CONNECTIONS", str(resolved_pool_connections)
)
log_config_param(logger, service_name, "POOL_MAXSIZE", str(resolved_pool_maxsize))