_request_matcher.pyā¢8.12 kB
import json
import re
from typing import Optional, Union, Any
from re import Pattern
import httpx
from pytest_httpx._httpx_internals import _proxy_url
from pytest_httpx._options import _HTTPXMockOptions
def _url_match(
url_to_match: Union[Pattern[str], httpx.URL], received: httpx.URL
) -> bool:
if isinstance(url_to_match, re.Pattern):
return url_to_match.match(str(received)) is not None
# Compare query parameters apart as order of parameters should not matter
received_params = dict(received.params)
params = dict(url_to_match.params)
# Remove the query parameters from the original URL to compare everything besides query parameters
received_url = received.copy_with(query=None)
url = url_to_match.copy_with(query=None)
return (received_params == params) and (url == received_url)
class _RequestMatcher:
def __init__(
self,
options: _HTTPXMockOptions,
url: Optional[Union[str, Pattern[str], httpx.URL]] = None,
method: Optional[str] = None,
proxy_url: Optional[Union[str, Pattern[str], httpx.URL]] = None,
match_headers: Optional[dict[str, Any]] = None,
match_content: Optional[bytes] = None,
match_json: Optional[Any] = None,
match_data: Optional[dict[str, Any]] = None,
match_files: Optional[Any] = None,
match_extensions: Optional[dict[str, Any]] = None,
is_optional: Optional[bool] = None,
is_reusable: Optional[bool] = None,
):
self._options = options
self.nb_calls = 0
self.url = httpx.URL(url) if url and isinstance(url, str) else url
self.method = method.upper() if method else method
self.headers = match_headers
self.content = match_content
self.json = match_json
self.data = match_data
self.files = match_files
self.proxy_url = (
httpx.URL(proxy_url)
if proxy_url and isinstance(proxy_url, str)
else proxy_url
)
self.extensions = match_extensions
self.is_optional = not options.assert_all_responses_were_requested if is_optional is None else is_optional
self.is_reusable = options.can_send_already_matched_responses if is_reusable is None else is_reusable
if self._is_matching_body_more_than_one_way():
raise ValueError(
"Only one way of matching against the body can be provided. "
"If you want to match against the JSON decoded representation, use match_json. "
"If you want to match against the multipart representation, use match_files (and match_data). "
"Otherwise, use match_content."
)
if self.data and not self.files:
raise ValueError(
"match_data is meant to be used for multipart matching (in conjunction with match_files)."
"Use match_content to match url encoded data."
)
def expect_body(self) -> bool:
matching_ways = [
self.content is not None,
self.json is not None,
self.files is not None,
]
return sum(matching_ways) == 1
def _is_matching_body_more_than_one_way(self) -> bool:
matching_ways = [
self.content is not None,
self.json is not None,
self.files is not None,
]
return sum(matching_ways) > 1
def match(
self,
real_transport: Union[httpx.HTTPTransport, httpx.AsyncHTTPTransport],
request: httpx.Request,
) -> bool:
return (
self._url_match(request)
and self._method_match(request)
and self._headers_match(request)
and self._content_match(request)
and self._proxy_match(real_transport)
and self._extensions_match(request)
)
def _url_match(self, request: httpx.Request) -> bool:
if not self.url:
return True
return _url_match(self.url, request.url)
def _method_match(self, request: httpx.Request) -> bool:
if not self.method:
return True
return request.method == self.method
def _headers_match(self, request: httpx.Request) -> bool:
if not self.headers:
return True
encoding = request.headers.encoding
request_headers = {}
# Can be cleaned based on the outcome of https://github.com/encode/httpx/discussions/2841
for raw_name, raw_value in request.headers.raw:
if raw_name in request_headers:
request_headers[raw_name] += b", " + raw_value
else:
request_headers[raw_name] = raw_value
return all(
request_headers.get(header_name.encode(encoding))
== header_value.encode(encoding)
for header_name, header_value in self.headers.items()
)
def _content_match(self, request: httpx.Request) -> bool:
if self.content is not None:
return request.content == self.content
if self.json is not None:
try:
# httpx._content.encode_json hard codes utf-8 encoding.
return json.loads(request.content.decode("utf-8")) == self.json
except json.decoder.JSONDecodeError:
return False
if self.files:
if not (
boundary_matched := re.match(b"^--([0-9a-f]*)\r\n", request.content)
):
return False
# Ensure we re-use the same boundary for comparison
boundary = boundary_matched.group(1)
# Prevent internal httpx changes from impacting users not matching on files
from httpx._multipart import MultipartStream
multipart_content = b"".join(
MultipartStream(self.data or {}, self.files, boundary)
)
return request.content == multipart_content
return True
def _proxy_match(
self, real_transport: Union[httpx.HTTPTransport, httpx.AsyncHTTPTransport]
) -> bool:
if not self.proxy_url:
return True
if real_proxy_url := _proxy_url(real_transport):
return _url_match(self.proxy_url, real_proxy_url)
return False
def _extensions_match(self, request: httpx.Request) -> bool:
if not self.extensions:
return True
return all(
request.extensions.get(extension_name) == extension_value
for extension_name, extension_value in self.extensions.items()
)
def should_have_matched(self) -> bool:
"""Return True if the matcher did not serve its purpose."""
return not self.is_optional and not self.nb_calls
def __str__(self) -> str:
if self.is_reusable:
matcher_description = f"Match {self.method or 'every'} request"
else:
matcher_description = "Already matched" if self.nb_calls else "Match"
matcher_description += f" {self.method or 'any'} request"
if self.url:
matcher_description += f" on {self.url}"
if extra_description := self._extra_description():
matcher_description += f" with {extra_description}"
return matcher_description
def _extra_description(self) -> str:
extra_description = []
if self.headers:
extra_description.append(f"{self.headers} headers")
if self.content is not None:
extra_description.append(f"{self.content} body")
if self.json is not None:
extra_description.append(f"{self.json} json body")
if self.data is not None:
extra_description.append(f"{self.data} multipart data")
if self.files is not None:
extra_description.append(f"{self.files} files")
if self.proxy_url:
extra_description.append(f"{self.proxy_url} proxy URL")
if self.extensions:
extra_description.append(f"{self.extensions} extensions")
return " and ".join(extra_description)