jsonl.py•3.51 kB
from __future__ import annotations
import json
from typing_extensions import Generic, TypeVar, Iterator, AsyncIterator
import httpx
from .._models import construct_type_unchecked
_T = TypeVar("_T")
class JSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.
This class provides an iterator over a byte-iterator that parses each JSON Line
into a given type.
"""
http_response: httpx.Response
"""The HTTP response this decoder was constructed from"""
def __init__(
self,
*,
raw_iterator: Iterator[bytes],
line_type: type[_T],
http_response: httpx.Response,
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()
def close(self) -> None:
"""Close the response body stream.
This is called automatically if you consume the entire stream.
"""
self.http_response.close()
def __decode__(self) -> Iterator[_T]:
buf = b""
for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""
# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
def __next__(self) -> _T:
return self._iterator.__next__()
def __iter__(self) -> Iterator[_T]:
for item in self._iterator:
yield item
class AsyncJSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.
This class provides an async iterator over a byte-iterator that parses each JSON Line
into a given type.
"""
http_response: httpx.Response
def __init__(
self,
*,
raw_iterator: AsyncIterator[bytes],
line_type: type[_T],
http_response: httpx.Response,
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()
async def close(self) -> None:
"""Close the response body stream.
This is called automatically if you consume the entire stream.
"""
await self.http_response.aclose()
async def __decode__(self) -> AsyncIterator[_T]:
buf = b""
async for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""
# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
async def __anext__(self) -> _T:
return await self._iterator.__anext__()
async def __aiter__(self) -> AsyncIterator[_T]:
async for item in self._iterator:
yield item