ClickUp Operator

from __future__ import annotations import os from typing import Any, Union, Mapping, TypeVar from typing_extensions import Self, override import httpx from ... import _exceptions from ..._types import NOT_GIVEN, Timeout, NotGiven from ..._utils import is_dict, is_given from ..._version import __version__ from ..._streaming import Stream, AsyncStream from ..._exceptions import APIStatusError from ..._base_client import ( DEFAULT_MAX_RETRIES, BaseClient, SyncAPIClient, AsyncAPIClient, FinalRequestOptions, ) from ._stream_decoder import AWSEventStreamDecoder from ...resources.messages import Messages, AsyncMessages from ...resources.completions import Completions, AsyncCompletions DEFAULT_VERSION = "bedrock-2023-05-31" _HttpxClientT = TypeVar("_HttpxClientT", bound=Union[httpx.Client, httpx.AsyncClient]) _DefaultStreamT = TypeVar("_DefaultStreamT", bound=Union[Stream[Any], AsyncStream[Any]]) class BaseBedrockClient(BaseClient[_HttpxClientT, _DefaultStreamT]): @override def _build_request( self, options: FinalRequestOptions, ) -> httpx.Request: if is_dict(options.json_data): options.json_data.setdefault("anthropic_version", DEFAULT_VERSION) if options.url in {"/v1/complete", "/v1/messages"} and options.method == "post": if not is_dict(options.json_data): raise RuntimeError("Expected dictionary json_data for post /completions endpoint") model = options.json_data.pop("model", None) stream = options.json_data.pop("stream", False) if stream: options.url = f"/model/{model}/invoke-with-response-stream" else: options.url = f"/model/{model}/invoke" return super()._build_request(options) @override def _make_status_error( self, err_msg: str, *, body: object, response: httpx.Response, ) -> APIStatusError: if response.status_code == 400: return _exceptions.BadRequestError(err_msg, response=response, body=body) if response.status_code == 401: return _exceptions.AuthenticationError(err_msg, response=response, body=body) if response.status_code == 403: return _exceptions.PermissionDeniedError(err_msg, response=response, body=body) if response.status_code == 404: return _exceptions.NotFoundError(err_msg, response=response, body=body) if response.status_code == 409: return _exceptions.ConflictError(err_msg, response=response, body=body) if response.status_code == 422: return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body) if response.status_code == 429: return _exceptions.RateLimitError(err_msg, response=response, body=body) if response.status_code >= 500: return _exceptions.InternalServerError(err_msg, response=response, body=body) return APIStatusError(err_msg, response=response, body=body) class AnthropicBedrock(BaseBedrockClient[httpx.Client, Stream[Any]], SyncAPIClient): messages: Messages completions: Completions def __init__( self, aws_secret_key: str | None = None, aws_access_key: str | None = None, aws_region: str | None = None, aws_session_token: str | None = None, base_url: str | httpx.URL | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, max_retries: int = DEFAULT_MAX_RETRIES, default_headers: Mapping[str, str] | None = None, default_query: Mapping[str, object] | None = None, # Configure a custom httpx client. See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details. http_client: httpx.Client | None = None, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. # # This parameter may be removed or changed in the future. # If you rely on this feature, please open a GitHub issue # outlining your use-case to help us decide if it should be # part of our public interface in the future. _strict_response_validation: bool = False, ) -> None: self.aws_secret_key = aws_secret_key self.aws_access_key = aws_access_key if aws_region is None: aws_region = os.environ.get("AWS_REGION") or "us-east-1" self.aws_region = aws_region self.aws_session_token = aws_session_token if base_url is None: base_url = os.environ.get("ANTHROPIC_BEDROCK_BASE_URL") if base_url is None: base_url = f"https://bedrock-runtime.{self.aws_region}.amazonaws.com" super().__init__( version=__version__, base_url=base_url, timeout=timeout, max_retries=max_retries, custom_headers=default_headers, custom_query=default_query, http_client=http_client, _strict_response_validation=_strict_response_validation, ) self.messages = Messages(self) self.completions = Completions(self) @override def _make_sse_decoder(self) -> AWSEventStreamDecoder: return AWSEventStreamDecoder() @override def _prepare_request(self, request: httpx.Request) -> None: from ._auth import get_auth_headers data = request.read().decode() headers = get_auth_headers( method=request.method, url=str(request.url), headers=request.headers, aws_access_key=self.aws_access_key, aws_secret_key=self.aws_secret_key, aws_session_token=self.aws_session_token, region=self.aws_region or "us-east-1", data=data, ) request.headers.update(headers) def copy( self, *, aws_secret_key: str | None = None, aws_access_key: str | None = None, aws_region: str | None = None, aws_session_token: str | None = None, base_url: str | httpx.URL | None = None, timeout: float | Timeout | None | NotGiven = NOT_GIVEN, http_client: httpx.Client | None = None, max_retries: int | NotGiven = NOT_GIVEN, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, default_query: Mapping[str, object] | None = None, set_default_query: Mapping[str, object] | None = None, _extra_kwargs: Mapping[str, Any] = {}, ) -> Self: """ Create a new client instance re-using the same options given to the current client with optional overriding. """ if default_headers is not None and set_default_headers is not None: raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive") if default_query is not None and set_default_query is not None: raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive") headers = self._custom_headers if default_headers is not None: headers = {**headers, **default_headers} elif set_default_headers is not None: headers = set_default_headers params = self._custom_query if default_query is not None: params = {**params, **default_query} elif set_default_query is not None: params = set_default_query return self.__class__( aws_secret_key=aws_secret_key or self.aws_secret_key, aws_access_key=aws_access_key or self.aws_access_key, aws_region=aws_region or self.aws_region, aws_session_token=aws_session_token or self.aws_session_token, base_url=base_url or self.base_url, timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, **_extra_kwargs, ) # Alias for `copy` for nicer inline usage, e.g. # client.with_options(timeout=10).foo.create(...) with_options = copy class AsyncAnthropicBedrock(BaseBedrockClient[httpx.AsyncClient, AsyncStream[Any]], AsyncAPIClient): messages: AsyncMessages completions: AsyncCompletions def __init__( self, aws_secret_key: str | None = None, aws_access_key: str | None = None, aws_region: str | None = None, aws_session_token: str | None = None, base_url: str | httpx.URL | None = None, timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, max_retries: int = DEFAULT_MAX_RETRIES, default_headers: Mapping[str, str] | None = None, default_query: Mapping[str, object] | None = None, # Configure a custom httpx client. See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details. http_client: httpx.AsyncClient | None = None, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. # # This parameter may be removed or changed in the future. # If you rely on this feature, please open a GitHub issue # outlining your use-case to help us decide if it should be # part of our public interface in the future. _strict_response_validation: bool = False, ) -> None: self.aws_secret_key = aws_secret_key self.aws_access_key = aws_access_key if aws_region is None: aws_region = os.environ.get("AWS_REGION") or "us-east-1" self.aws_region = aws_region self.aws_session_token = aws_session_token if base_url is None: base_url = os.environ.get("ANTHROPIC_BEDROCK_BASE_URL") if base_url is None: base_url = f"https://bedrock-runtime.{self.aws_region}.amazonaws.com" super().__init__( version=__version__, base_url=base_url, timeout=timeout, max_retries=max_retries, custom_headers=default_headers, custom_query=default_query, http_client=http_client, _strict_response_validation=_strict_response_validation, ) self.messages = AsyncMessages(self) self.completions = AsyncCompletions(self) @override def _make_sse_decoder(self) -> AWSEventStreamDecoder: return AWSEventStreamDecoder() @override async def _prepare_request(self, request: httpx.Request) -> None: from ._auth import get_auth_headers data = request.read().decode() headers = get_auth_headers( method=request.method, url=str(request.url), headers=request.headers, aws_access_key=self.aws_access_key, aws_secret_key=self.aws_secret_key, aws_session_token=self.aws_session_token, region=self.aws_region or "us-east-1", data=data, ) request.headers.update(headers) def copy( self, *, aws_secret_key: str | None = None, aws_access_key: str | None = None, aws_region: str | None = None, aws_session_token: str | None = None, base_url: str | httpx.URL | None = None, timeout: float | Timeout | None | NotGiven = NOT_GIVEN, http_client: httpx.AsyncClient | None = None, max_retries: int | NotGiven = NOT_GIVEN, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, default_query: Mapping[str, object] | None = None, set_default_query: Mapping[str, object] | None = None, _extra_kwargs: Mapping[str, Any] = {}, ) -> Self: """ Create a new client instance re-using the same options given to the current client with optional overriding. """ if default_headers is not None and set_default_headers is not None: raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive") if default_query is not None and set_default_query is not None: raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive") headers = self._custom_headers if default_headers is not None: headers = {**headers, **default_headers} elif set_default_headers is not None: headers = set_default_headers params = self._custom_query if default_query is not None: params = {**params, **default_query} elif set_default_query is not None: params = set_default_query return self.__class__( aws_secret_key=aws_secret_key or self.aws_secret_key, aws_access_key=aws_access_key or self.aws_access_key, aws_region=aws_region or self.aws_region, aws_session_token=aws_session_token or self.aws_session_token, base_url=base_url or self.base_url, timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, **_extra_kwargs, ) # Alias for `copy` for nicer inline usage, e.g. # client.with_options(timeout=10).foo.create(...) with_options = copy