ClickUp Operator
by noahvanhart
- .venv
- Lib
- site-packages
- anthropic
# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
from __future__ import annotations
import os
from typing import Any, Union, Mapping
from typing_extensions import Self, override
import httpx
from . import resources, _constants, _exceptions
from ._qs import Querystring
from ._types import (
NOT_GIVEN,
Omit,
Headers,
Timeout,
NotGiven,
Transport,
ProxiesTypes,
AsyncTransport,
RequestOptions,
)
from ._utils import (
is_given,
get_async_library,
)
from ._version import __version__
from ._streaming import Stream as Stream, AsyncStream as AsyncStream
from ._exceptions import APIStatusError
from ._tokenizers import (
TokenizerType, # type: ignore[import]
sync_get_tokenizer,
async_get_tokenizer,
)
from ._base_client import (
DEFAULT_MAX_RETRIES,
DEFAULT_CONNECTION_LIMITS,
SyncAPIClient,
AsyncAPIClient,
SyncHttpxClientWrapper,
AsyncHttpxClientWrapper,
)
__all__ = [
"Timeout",
"Transport",
"ProxiesTypes",
"RequestOptions",
"resources",
"Anthropic",
"AsyncAnthropic",
"Client",
"AsyncClient",
]
class Anthropic(SyncAPIClient):
completions: resources.Completions
messages: resources.Messages
beta: resources.Beta
with_raw_response: AnthropicWithRawResponse
with_streaming_response: AnthropicWithStreamedResponse
# client options
api_key: str | None
auth_token: str | None
# constants
HUMAN_PROMPT = _constants.HUMAN_PROMPT
AI_PROMPT = _constants.AI_PROMPT
def __init__(
self,
*,
api_key: str | None = None,
auth_token: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN,
max_retries: int = DEFAULT_MAX_RETRIES,
default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
# Configure a custom httpx client.
# We provide a `DefaultHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`.
# See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details.
http_client: httpx.Client | None = None,
# See httpx documentation for [custom transports](https://www.python-httpx.org/advanced/#custom-transports)
transport: Transport | None = None,
# See httpx documentation for [proxies](https://www.python-httpx.org/advanced/#http-proxying)
proxies: ProxiesTypes | None = None,
# See httpx documentation for [limits](https://www.python-httpx.org/advanced/#pool-limit-configuration)
connection_pool_limits: httpx.Limits | None = None,
# Enable or disable schema validation for data returned by the API.
# When enabled an error APIResponseValidationError is raised
# if the API responds with invalid data for the expected schema.
#
# This parameter may be removed or changed in the future.
# If you rely on this feature, please open a GitHub issue
# outlining your use-case to help us decide if it should be
# part of our public interface in the future.
_strict_response_validation: bool = False,
) -> None:
"""Construct a new synchronous anthropic client instance.
This automatically infers the following arguments from their corresponding environment variables if they are not provided:
- `api_key` from `ANTHROPIC_API_KEY`
- `auth_token` from `ANTHROPIC_AUTH_TOKEN`
"""
if api_key is None:
api_key = os.environ.get("ANTHROPIC_API_KEY")
self.api_key = api_key
if auth_token is None:
auth_token = os.environ.get("ANTHROPIC_AUTH_TOKEN")
self.auth_token = auth_token
if base_url is None:
base_url = os.environ.get("ANTHROPIC_BASE_URL")
if base_url is None:
base_url = f"https://api.anthropic.com"
super().__init__(
version=__version__,
base_url=base_url,
max_retries=max_retries,
timeout=timeout,
http_client=http_client,
transport=transport,
proxies=proxies,
limits=connection_pool_limits,
custom_headers=default_headers,
custom_query=default_query,
_strict_response_validation=_strict_response_validation,
)
self._default_stream_cls = Stream
self.completions = resources.Completions(self)
self.messages = resources.Messages(self)
self.beta = resources.Beta(self)
self.with_raw_response = AnthropicWithRawResponse(self)
self.with_streaming_response = AnthropicWithStreamedResponse(self)
@property
@override
def qs(self) -> Querystring:
return Querystring(array_format="comma")
@property
@override
def auth_headers(self) -> dict[str, str]:
if self._api_key_auth:
return self._api_key_auth
if self._bearer_auth:
return self._bearer_auth
return {}
@property
def _api_key_auth(self) -> dict[str, str]:
api_key = self.api_key
if api_key is None:
return {}
return {"X-Api-Key": api_key}
@property
def _bearer_auth(self) -> dict[str, str]:
auth_token = self.auth_token
if auth_token is None:
return {}
return {"Authorization": f"Bearer {auth_token}"}
@property
@override
def default_headers(self) -> dict[str, str | Omit]:
return {
**super().default_headers,
"X-Stainless-Async": "false",
"anthropic-version": "2023-06-01",
**self._custom_headers,
}
@override
def _validate_headers(self, headers: Headers, custom_headers: Headers) -> None:
if self.api_key and headers.get("X-Api-Key"):
return
if isinstance(custom_headers.get("X-Api-Key"), Omit):
return
if self.auth_token and headers.get("Authorization"):
return
if isinstance(custom_headers.get("Authorization"), Omit):
return
raise TypeError(
'"Could not resolve authentication method. Expected either api_key or auth_token to be set. Or for one of the `X-Api-Key` or `Authorization` headers to be explicitly omitted"'
)
def copy(
self,
*,
api_key: str | None = None,
auth_token: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
http_client: httpx.Client | None = None,
connection_pool_limits: httpx.Limits | None = None,
max_retries: int | NotGiven = NOT_GIVEN,
default_headers: Mapping[str, str] | None = None,
set_default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
set_default_query: Mapping[str, object] | None = None,
_extra_kwargs: Mapping[str, Any] = {},
) -> Self:
"""
Create a new client instance re-using the same options given to the current client with optional overriding.
"""
if default_headers is not None and set_default_headers is not None:
raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive")
if default_query is not None and set_default_query is not None:
raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive")
headers = self._custom_headers
if default_headers is not None:
headers = {**headers, **default_headers}
elif set_default_headers is not None:
headers = set_default_headers
params = self._custom_query
if default_query is not None:
params = {**params, **default_query}
elif set_default_query is not None:
params = set_default_query
if connection_pool_limits is not None:
if http_client is not None:
raise ValueError("The 'http_client' argument is mutually exclusive with 'connection_pool_limits'")
if not isinstance(self._client, SyncHttpxClientWrapper):
raise ValueError(
"A custom HTTP client has been set and is mutually exclusive with the 'connection_pool_limits' argument"
)
http_client = None
else:
if self._limits is not DEFAULT_CONNECTION_LIMITS:
connection_pool_limits = self._limits
else:
connection_pool_limits = None
http_client = http_client or self._client
return self.__class__(
api_key=api_key or self.api_key,
auth_token=auth_token or self.auth_token,
base_url=base_url or self.base_url,
timeout=self.timeout if isinstance(timeout, NotGiven) else timeout,
http_client=http_client,
connection_pool_limits=connection_pool_limits,
max_retries=max_retries if is_given(max_retries) else self.max_retries,
default_headers=headers,
default_query=params,
**_extra_kwargs,
)
# Alias for `copy` for nicer inline usage, e.g.
# client.with_options(timeout=10).foo.create(...)
with_options = copy
def count_tokens(
self,
text: str,
) -> int:
"""Count the number of tokens in a given string.
Note that this is only accurate for older models, e.g. `claude-2.1`. For newer
models this can only be used as a _very_ rough estimate, instead you should rely
on the `usage` property in the response for exact counts.
"""
# Note: tokenizer is untyped
tokenizer = self.get_tokenizer()
encoded_text = tokenizer.encode(text) # type: ignore
return len(encoded_text.ids) # type: ignore
def get_tokenizer(self) -> TokenizerType:
return sync_get_tokenizer()
@override
def _make_status_error(
self,
err_msg: str,
*,
body: object,
response: httpx.Response,
) -> APIStatusError:
if response.status_code == 400:
return _exceptions.BadRequestError(err_msg, response=response, body=body)
if response.status_code == 401:
return _exceptions.AuthenticationError(err_msg, response=response, body=body)
if response.status_code == 403:
return _exceptions.PermissionDeniedError(err_msg, response=response, body=body)
if response.status_code == 404:
return _exceptions.NotFoundError(err_msg, response=response, body=body)
if response.status_code == 409:
return _exceptions.ConflictError(err_msg, response=response, body=body)
if response.status_code == 422:
return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body)
if response.status_code == 429:
return _exceptions.RateLimitError(err_msg, response=response, body=body)
if response.status_code >= 500:
return _exceptions.InternalServerError(err_msg, response=response, body=body)
return APIStatusError(err_msg, response=response, body=body)
class AsyncAnthropic(AsyncAPIClient):
completions: resources.AsyncCompletions
messages: resources.AsyncMessages
beta: resources.AsyncBeta
with_raw_response: AsyncAnthropicWithRawResponse
with_streaming_response: AsyncAnthropicWithStreamedResponse
# client options
api_key: str | None
auth_token: str | None
# constants
HUMAN_PROMPT = _constants.HUMAN_PROMPT
AI_PROMPT = _constants.AI_PROMPT
def __init__(
self,
*,
api_key: str | None = None,
auth_token: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN,
max_retries: int = DEFAULT_MAX_RETRIES,
default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
# Configure a custom httpx client.
# We provide a `DefaultAsyncHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`.
# See the [httpx documentation](https://www.python-httpx.org/api/#asyncclient) for more details.
http_client: httpx.AsyncClient | None = None,
# See httpx documentation for [custom transports](https://www.python-httpx.org/advanced/#custom-transports)
transport: AsyncTransport | None = None,
# See httpx documentation for [proxies](https://www.python-httpx.org/advanced/#http-proxying)
proxies: ProxiesTypes | None = None,
# See httpx documentation for [limits](https://www.python-httpx.org/advanced/#pool-limit-configuration)
connection_pool_limits: httpx.Limits | None = None,
# Enable or disable schema validation for data returned by the API.
# When enabled an error APIResponseValidationError is raised
# if the API responds with invalid data for the expected schema.
#
# This parameter may be removed or changed in the future.
# If you rely on this feature, please open a GitHub issue
# outlining your use-case to help us decide if it should be
# part of our public interface in the future.
_strict_response_validation: bool = False,
) -> None:
"""Construct a new async anthropic client instance.
This automatically infers the following arguments from their corresponding environment variables if they are not provided:
- `api_key` from `ANTHROPIC_API_KEY`
- `auth_token` from `ANTHROPIC_AUTH_TOKEN`
"""
if api_key is None:
api_key = os.environ.get("ANTHROPIC_API_KEY")
self.api_key = api_key
if auth_token is None:
auth_token = os.environ.get("ANTHROPIC_AUTH_TOKEN")
self.auth_token = auth_token
if base_url is None:
base_url = os.environ.get("ANTHROPIC_BASE_URL")
if base_url is None:
base_url = f"https://api.anthropic.com"
super().__init__(
version=__version__,
base_url=base_url,
max_retries=max_retries,
timeout=timeout,
http_client=http_client,
transport=transport,
proxies=proxies,
limits=connection_pool_limits,
custom_headers=default_headers,
custom_query=default_query,
_strict_response_validation=_strict_response_validation,
)
self._default_stream_cls = AsyncStream
self.completions = resources.AsyncCompletions(self)
self.messages = resources.AsyncMessages(self)
self.beta = resources.AsyncBeta(self)
self.with_raw_response = AsyncAnthropicWithRawResponse(self)
self.with_streaming_response = AsyncAnthropicWithStreamedResponse(self)
@property
@override
def qs(self) -> Querystring:
return Querystring(array_format="comma")
@property
@override
def auth_headers(self) -> dict[str, str]:
if self._api_key_auth:
return self._api_key_auth
if self._bearer_auth:
return self._bearer_auth
return {}
@property
def _api_key_auth(self) -> dict[str, str]:
api_key = self.api_key
if api_key is None:
return {}
return {"X-Api-Key": api_key}
@property
def _bearer_auth(self) -> dict[str, str]:
auth_token = self.auth_token
if auth_token is None:
return {}
return {"Authorization": f"Bearer {auth_token}"}
@property
@override
def default_headers(self) -> dict[str, str | Omit]:
return {
**super().default_headers,
"X-Stainless-Async": f"async:{get_async_library()}",
"anthropic-version": "2023-06-01",
**self._custom_headers,
}
@override
def _validate_headers(self, headers: Headers, custom_headers: Headers) -> None:
if self.api_key and headers.get("X-Api-Key"):
return
if isinstance(custom_headers.get("X-Api-Key"), Omit):
return
if self.auth_token and headers.get("Authorization"):
return
if isinstance(custom_headers.get("Authorization"), Omit):
return
raise TypeError(
'"Could not resolve authentication method. Expected either api_key or auth_token to be set. Or for one of the `X-Api-Key` or `Authorization` headers to be explicitly omitted"'
)
def copy(
self,
*,
api_key: str | None = None,
auth_token: str | None = None,
base_url: str | httpx.URL | None = None,
timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
http_client: httpx.AsyncClient | None = None,
connection_pool_limits: httpx.Limits | None = None,
max_retries: int | NotGiven = NOT_GIVEN,
default_headers: Mapping[str, str] | None = None,
set_default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
set_default_query: Mapping[str, object] | None = None,
_extra_kwargs: Mapping[str, Any] = {},
) -> Self:
"""
Create a new client instance re-using the same options given to the current client with optional overriding.
"""
if default_headers is not None and set_default_headers is not None:
raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive")
if default_query is not None and set_default_query is not None:
raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive")
headers = self._custom_headers
if default_headers is not None:
headers = {**headers, **default_headers}
elif set_default_headers is not None:
headers = set_default_headers
params = self._custom_query
if default_query is not None:
params = {**params, **default_query}
elif set_default_query is not None:
params = set_default_query
if connection_pool_limits is not None:
if http_client is not None:
raise ValueError("The 'http_client' argument is mutually exclusive with 'connection_pool_limits'")
if not isinstance(self._client, AsyncHttpxClientWrapper):
raise ValueError(
"A custom HTTP client has been set and is mutually exclusive with the 'connection_pool_limits' argument"
)
http_client = None
else:
if self._limits is not DEFAULT_CONNECTION_LIMITS:
connection_pool_limits = self._limits
else:
connection_pool_limits = None
http_client = http_client or self._client
return self.__class__(
api_key=api_key or self.api_key,
auth_token=auth_token or self.auth_token,
base_url=base_url or self.base_url,
timeout=self.timeout if isinstance(timeout, NotGiven) else timeout,
http_client=http_client,
connection_pool_limits=connection_pool_limits,
max_retries=max_retries if is_given(max_retries) else self.max_retries,
default_headers=headers,
default_query=params,
**_extra_kwargs,
)
# Alias for `copy` for nicer inline usage, e.g.
# client.with_options(timeout=10).foo.create(...)
with_options = copy
async def count_tokens(
self,
text: str,
) -> int:
"""Count the number of tokens in a given string.
Note that this is only accurate for older models, e.g. `claude-2.1`. For newer
models this can only be used as a _very_ rough estimate, instead you should rely
on the `usage` property in the response for exact counts.
"""
# Note: tokenizer is untyped
tokenizer = await self.get_tokenizer()
encoded_text = tokenizer.encode(text) # type: ignore
return len(encoded_text.ids) # type: ignore
async def get_tokenizer(self) -> TokenizerType:
return await async_get_tokenizer()
@override
def _make_status_error(
self,
err_msg: str,
*,
body: object,
response: httpx.Response,
) -> APIStatusError:
if response.status_code == 400:
return _exceptions.BadRequestError(err_msg, response=response, body=body)
if response.status_code == 401:
return _exceptions.AuthenticationError(err_msg, response=response, body=body)
if response.status_code == 403:
return _exceptions.PermissionDeniedError(err_msg, response=response, body=body)
if response.status_code == 404:
return _exceptions.NotFoundError(err_msg, response=response, body=body)
if response.status_code == 409:
return _exceptions.ConflictError(err_msg, response=response, body=body)
if response.status_code == 422:
return _exceptions.UnprocessableEntityError(err_msg, response=response, body=body)
if response.status_code == 429:
return _exceptions.RateLimitError(err_msg, response=response, body=body)
if response.status_code >= 500:
return _exceptions.InternalServerError(err_msg, response=response, body=body)
return APIStatusError(err_msg, response=response, body=body)
class AnthropicWithRawResponse:
def __init__(self, client: Anthropic) -> None:
self.completions = resources.CompletionsWithRawResponse(client.completions)
self.messages = resources.MessagesWithRawResponse(client.messages)
self.beta = resources.BetaWithRawResponse(client.beta)
class AsyncAnthropicWithRawResponse:
def __init__(self, client: AsyncAnthropic) -> None:
self.completions = resources.AsyncCompletionsWithRawResponse(client.completions)
self.messages = resources.AsyncMessagesWithRawResponse(client.messages)
self.beta = resources.AsyncBetaWithRawResponse(client.beta)
class AnthropicWithStreamedResponse:
def __init__(self, client: Anthropic) -> None:
self.completions = resources.CompletionsWithStreamingResponse(client.completions)
self.messages = resources.MessagesWithStreamingResponse(client.messages)
self.beta = resources.BetaWithStreamingResponse(client.beta)
class AsyncAnthropicWithStreamedResponse:
def __init__(self, client: AsyncAnthropic) -> None:
self.completions = resources.AsyncCompletionsWithStreamingResponse(client.completions)
self.messages = resources.AsyncMessagesWithStreamingResponse(client.messages)
self.beta = resources.AsyncBetaWithStreamingResponse(client.beta)
Client = Anthropic
AsyncClient = AsyncAnthropic