# $Id: io.py 10146 2025-05-27 06:14:22Z milde $
# Author: David Goodger <goodger@python.org>
# Copyright: This module has been placed in the public domain.
"""
I/O classes provide a uniform API for low-level input and output. Subclasses
exist for a variety of input/output mechanisms.
"""
from __future__ import annotations
__docformat__ = 'reStructuredText'
import codecs
import locale
import os
import re
import sys
import warnings
from docutils import TransformSpec
TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import Any, BinaryIO, ClassVar, Final, Literal, TextIO
from docutils import nodes
from docutils.nodes import StrPath
# Guess the locale's preferred encoding.
# If no valid guess can be made, _locale_encoding is set to `None`:
#
# TODO: check whether this is set correctly with every OS and Python version
# or whether front-end tools need to call `locale.setlocale()`
# before importing this module
try:
# Return locale encoding also in UTF-8 mode
with warnings.catch_warnings():
warnings.simplefilter("ignore")
_locale_encoding: str | None = (locale.getlocale()[1]
or locale.getdefaultlocale()[1]
).lower()
except: # NoQA: E722 (catchall)
# Any problem determining the locale: use None
_locale_encoding = None
try:
codecs.lookup(_locale_encoding)
except (LookupError, TypeError):
_locale_encoding = None
class InputError(OSError): pass
class OutputError(OSError): pass
def check_encoding(stream: TextIO, encoding: str) -> bool | None:
"""Test, whether the encoding of `stream` matches `encoding`.
Returns
:None: if `encoding` or `stream.encoding` are not a valid encoding
argument (e.g. ``None``) or `stream.encoding is missing.
:True: if the encoding argument resolves to the same value as `encoding`,
:False: if the encodings differ.
"""
try:
return codecs.lookup(stream.encoding) == codecs.lookup(encoding)
except (LookupError, AttributeError, TypeError):
return None
def error_string(err: BaseException) -> str:
"""Return string representation of Exception `err`.
"""
return f'{err.__class__.__name__}: {err}'
class Input(TransformSpec):
"""
Abstract base class for input wrappers.
Docutils input objects must provide a `read()` method that
returns the source, typically as `str` instance.
Inheriting `TransformSpec` allows input objects to add "transforms" to
the "Transformer". (Since Docutils 0.19, input objects are no longer
required to be `TransformSpec` instances.)
"""
component_type: Final = 'input'
default_source_path: ClassVar[str | None] = None
def __init__(
self,
source: str | TextIO | nodes.document | None = None,
source_path: StrPath | None = None,
encoding: str | Literal['unicode'] | None = 'utf-8',
error_handler: str | None = 'strict',
) -> None:
self.encoding = encoding
"""Text encoding for the input source."""
self.error_handler = error_handler
"""Text decoding error handler."""
self.source = source
"""The source of input data."""
self.source_path = source_path
"""A text reference to the source."""
if not source_path:
self.source_path = self.default_source_path
self.successful_encoding = None
"""The encoding that successfully decoded the source data."""
def __repr__(self) -> str:
return '%s: source=%r, source_path=%r' % (self.__class__, self.source,
self.source_path)
def read(self) -> str:
"""Return input as `str`. Define in subclasses."""
raise NotImplementedError
def decode(self, data: str | bytes) -> str:
"""
Decode `data` if required.
Return Unicode `str` instances unchanged (nothing to decode).
If `self.encoding` is None, determine encoding from data
or try UTF-8 and the locale's preferred encoding.
The client application should call ``locale.setlocale()`` at the
beginning of processing::
locale.setlocale(locale.LC_ALL, '')
Raise UnicodeError if unsuccessful.
Provisional: encoding detection will be removed in Docutils 1.0.
"""
if self.encoding and self.encoding.lower() == 'unicode':
assert isinstance(data, str), ('input encoding is "unicode" '
'but `data` is no `str` instance')
if isinstance(data, str):
# nothing to decode
return data
if self.encoding:
# We believe the user/application when the encoding is
# explicitly given.
encoding_candidates = [self.encoding]
else:
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=DeprecationWarning)
data_encoding = self.determine_encoding_from_data(data)
if data_encoding:
# `data` declares its encoding with "magic comment" or BOM,
encoding_candidates = [data_encoding]
else:
# Apply heuristics if the encoding is not specified.
# Start with UTF-8, because that only matches
# data that *IS* UTF-8:
encoding_candidates = ['utf-8']
# If UTF-8 fails, fall back to the locale's preferred encoding:
if sys.version_info[:2] >= (3, 11):
fallback = locale.getencoding()
else:
fallback = locale.getpreferredencoding(do_setlocale=False)
if fallback and fallback.lower() != 'utf-8':
encoding_candidates.append(fallback)
if not self.encoding and encoding_candidates[0] != 'utf-8':
warnings.warn('Input encoding auto-detection will be removed and '
'the encoding values None and "" become invalid '
'in Docutils 1.0.', DeprecationWarning, stacklevel=2)
for enc in encoding_candidates:
try:
decoded = str(data, enc, self.error_handler)
self.successful_encoding = enc
return decoded
except (UnicodeError, LookupError) as err:
# keep exception instance for use outside of the "for" loop.
error = err
raise UnicodeError(
'Unable to decode input data. Tried the following encodings: '
f'{", ".join(repr(enc) for enc in encoding_candidates)}.\n'
f'({error_string(error)})')
coding_slug: ClassVar[re.Pattern[bytes]] = re.compile(
br'coding[:=]\s*([-\w.]+)'
)
"""Encoding declaration pattern."""
byte_order_marks: ClassVar[tuple[tuple[bytes, str], ...]] = (
(codecs.BOM_UTF32_BE, 'utf-32'),
(codecs.BOM_UTF32_LE, 'utf-32'),
(codecs.BOM_UTF8, 'utf-8-sig'),
(codecs.BOM_UTF16_BE, 'utf-16'),
(codecs.BOM_UTF16_LE, 'utf-16'),
)
"""Sequence of (start_bytes, encoding) tuples for encoding detection.
The first bytes of input data are checked against the start_bytes strings.
A match indicates the given encoding.
Internal. Will be removed in Docutils 1.0.
"""
def determine_encoding_from_data(self, data: bytes) -> str | None:
"""
Try to determine the encoding of `data` by looking *in* `data`.
Check for a byte order mark (BOM) or an encoding declaration.
Deprecated. Will be removed in Docutils 1.0.
"""
warnings.warn('docutils.io.Input.determine_encoding_from_data() '
'will be removed in Docutils 1.0.',
DeprecationWarning, stacklevel=2)
# check for a byte order mark:
for start_bytes, encoding in self.byte_order_marks:
if data.startswith(start_bytes):
return encoding
# check for an encoding declaration pattern in first 2 lines of file:
for line in data.splitlines()[:2]:
match = self.coding_slug.search(line)
if match:
return match.group(1).decode('ascii')
return None
def isatty(self) -> bool:
"""Return True, if the input source is connected to a TTY device."""
try:
return self.source.isatty()
except AttributeError:
return False
class Output(TransformSpec):
"""
Abstract base class for output wrappers.
Docutils output objects must provide a `write()` method that
expects and handles one argument (the output).
Inheriting `TransformSpec` allows output objects to add "transforms" to
the "Transformer". (Since Docutils 0.19, output objects are no longer
required to be `TransformSpec` instances.)
"""
component_type: Final = 'output'
default_destination_path: ClassVar[str | None] = None
def __init__(
self,
destination: TextIO | str | bytes | None = None,
destination_path: StrPath | None = None,
encoding: str | None = None,
error_handler: str | None = 'strict',
) -> None:
self.encoding: str | None = encoding
"""Text encoding for the output destination."""
self.error_handler: str = error_handler or 'strict'
"""Text encoding error handler."""
self.destination: TextIO | str | bytes | None = destination
"""The destination for output data."""
self.destination_path: StrPath | None = destination_path
"""A text reference to the destination."""
if not destination_path:
self.destination_path = self.default_destination_path
def __repr__(self) -> str:
return ('%s: destination=%r, destination_path=%r'
% (self.__class__, self.destination, self.destination_path))
def write(self, data: str | bytes) -> str | bytes | None:
"""Write `data`. Define in subclasses."""
raise NotImplementedError
def encode(self, data: str | bytes) -> str | bytes:
"""
Encode and return `data`.
If `data` is a `bytes` instance, it is returned unchanged.
Otherwise it is encoded with `self.encoding`.
Provisional: If `self.encoding` is set to the pseudo encoding name
"unicode", `data` must be a `str` instance and is returned unchanged.
"""
if self.encoding and self.encoding.lower() == 'unicode':
assert isinstance(data, str), ('output encoding is "unicode" '
'but `data` is no `str` instance')
return data
if not isinstance(data, str):
# Non-unicode (e.g. bytes) output.
return data
else:
return data.encode(self.encoding, self.error_handler)
class ErrorOutput:
"""
Wrapper class for file-like error streams with
failsafe de- and encoding of `str`, `bytes`, and `Exception` instances.
"""
def __init__(
self,
destination: TextIO | BinaryIO | str | Literal[False] | None = None,
encoding: str | None = None,
encoding_errors: str = 'backslashreplace',
decoding_errors: str = 'replace',
) -> None:
"""
:Parameters:
- `destination`: a file-like object,
a string (path to a file),
`None` (write to `sys.stderr`, default), or
evaluating to `False` (write() requests are ignored).
- `encoding`: `destination` text encoding. Guessed if None.
- `encoding_errors`: how to treat encoding errors.
"""
if destination is None:
destination = sys.stderr
elif not destination:
destination = False
# if `destination` is a file name, open it
elif isinstance(destination, str):
destination = open(destination, 'w')
self.destination: TextIO | BinaryIO | Literal[False] = destination
"""Where warning output is sent."""
self.encoding: str = (
encoding
or getattr(destination, 'encoding', None)
or _locale_encoding
or 'ascii'
)
"""The output character encoding."""
self.encoding_errors: str = encoding_errors
"""Encoding error handler."""
self.decoding_errors: str = decoding_errors
"""Decoding error handler."""
def write(self, data: str | bytes | Exception) -> None:
"""
Write `data` to self.destination. Ignore, if self.destination is False.
`data` can be a `bytes`, `str`, or `Exception` instance.
"""
if not self.destination:
return
if isinstance(data, Exception):
data = str(data)
# The destination is either opened in text or binary mode.
# If data has the wrong type, try to convert it.
try:
self.destination.write(data)
except UnicodeEncodeError:
# Encoding data from string to bytes failed with the
# destination's encoding and error handler.
# Try again with our own encoding and error handler.
binary = data.encode(self.encoding, self.encoding_errors)
self.destination.write(binary)
except TypeError:
if isinstance(data, str): # destination may expect bytes
binary = data.encode(self.encoding, self.encoding_errors)
self.destination.write(binary)
elif self.destination in (sys.stderr, sys.stdout):
# write bytes to raw stream
self.destination.buffer.write(data)
else:
# destination in text mode, write str
string = data.decode(self.encoding, self.decoding_errors)
self.destination.write(string)
def close(self) -> None:
"""
Close the error-output stream.
Ignored if the destination is` sys.stderr` or `sys.stdout` or has no
close() method.
"""
if self.destination in (sys.stdout, sys.stderr):
return
try:
self.destination.close()
except AttributeError:
pass
def isatty(self) -> bool:
"""Return True, if the destination is connected to a TTY device."""
try:
return self.destination.isatty()
except AttributeError:
return False
class FileInput(Input):
"""
Input for single, simple file-like objects.
"""
def __init__(
self,
source: TextIO | None = None,
source_path: StrPath | None = None,
encoding: str | Literal['unicode'] | None = 'utf-8',
error_handler: str | None = 'strict',
autoclose: bool = True,
mode: Literal['r', 'rb', 'br'] = 'r'
) -> None:
"""
:Parameters:
- `source`: either a file-like object (which is read directly), or
`None` (which implies `sys.stdin` if no `source_path` given).
- `source_path`: a path to a file, which is opened for reading.
- `encoding`: the expected text encoding of the input file.
- `error_handler`: the encoding error handler to use.
- `autoclose`: close automatically after read (except when
`sys.stdin` is the source).
- `mode`: how the file is to be opened (see standard function
`open`). The default is read only ('r').
"""
super().__init__(source, source_path, encoding, error_handler)
self.autoclose = autoclose
self._stderr = ErrorOutput()
if source is None:
if source_path:
try:
self.source = open(source_path, mode,
encoding=self.encoding,
errors=self.error_handler)
except OSError as error:
raise InputError(error.errno, error.strerror, source_path)
else:
self.source = sys.stdin
elif check_encoding(self.source, self.encoding) is False:
# TODO: re-open, warn or raise error?
raise UnicodeError('Encoding clash: encoding given is "%s" '
'but source is opened with encoding "%s".' %
(self.encoding, self.source.encoding))
if not source_path:
try:
self.source_path = self.source.name
except AttributeError:
pass
def read(self) -> str:
"""
Read and decode a single file, return as `str`.
"""
try:
if not self.encoding and hasattr(self.source, 'buffer'):
# read as binary data
data = self.source.buffer.read()
# decode with heuristics
data = self.decode(data)
# normalize newlines
data = '\n'.join(data.splitlines()+[''])
else:
data = self.source.read()
finally:
if self.autoclose:
self.close()
return data
def readlines(self) -> list[str]:
"""
Return lines of a single file as list of strings.
"""
return self.read().splitlines(True)
def close(self) -> None:
if self.source is not sys.stdin:
self.source.close()
class FileOutput(Output):
"""Output for single, simple file-like objects."""
default_destination_path: Final = '<file>'
mode: Literal['w', 'a', 'x', 'wb', 'ab', 'xb', 'bw', 'ba', 'bx'] = 'w'
"""The mode argument for `open()`."""
# 'wb' for binary (e.g. OpenOffice) files (see also `BinaryFileOutput`).
# (Do not use binary mode ('wb') for text files, as this prevents the
# conversion of newlines to the system specific default.)
def __init__(self,
destination: TextIO | None = None,
destination_path: StrPath | None = None,
encoding: str | None = None,
error_handler: str | None = 'strict',
autoclose: bool = True,
handle_io_errors: None = None,
mode=None,
) -> None:
"""
:Parameters:
- `destination`: either a file-like object (which is written
directly) or `None` (which implies `sys.stdout` if no
`destination_path` given).
- `destination_path`: a path to a file, which is opened and then
written.
- `encoding`: the text encoding of the output file.
- `error_handler`: the encoding error handler to use.
- `autoclose`: close automatically after write (except when
`sys.stdout` or `sys.stderr` is the destination).
- `handle_io_errors`: ignored, deprecated, will be removed.
- `mode`: how the file is to be opened (see standard function
`open`). The default is 'w', providing universal newline
support for text files.
"""
super().__init__(
destination, destination_path, encoding, error_handler)
self.opened = True
self.autoclose = autoclose
if handle_io_errors is not None:
warnings.warn('io.FileOutput: init argument "handle_io_errors" '
'is ignored and will be removed in '
'Docutils 2.0.', DeprecationWarning, stacklevel=2)
if mode is not None:
self.mode = mode
self._stderr = ErrorOutput()
if destination is None:
if destination_path:
self.opened = False
else:
self.destination = sys.stdout
elif ( # destination is file-type object -> check mode:
mode and hasattr(self.destination, 'mode')
and mode != self.destination.mode):
print('Warning: Destination mode "%s" differs from specified '
'mode "%s"' % (self.destination.mode, mode),
file=self._stderr)
if not destination_path:
try:
self.destination_path = self.destination.name
except AttributeError:
pass
def open(self) -> None:
# Specify encoding
if 'b' not in self.mode:
kwargs = {'encoding': self.encoding,
'errors': self.error_handler}
else:
kwargs = {}
try:
self.destination = open(self.destination_path, self.mode, **kwargs)
except OSError as error:
raise OutputError(error.errno, error.strerror,
self.destination_path)
self.opened = True
def write(self, data: str | bytes) -> str | bytes:
"""Write `data` to a single file, also return it.
`data` can be a `str` or `bytes` instance.
If writing `bytes` fails, an attempt is made to write to
the low-level interface ``self.destination.buffer``.
If `data` is a `str` instance and `self.encoding` and
`self.destination.encoding` are set to different values, `data`
is encoded to a `bytes` instance using `self.encoding`.
Provisional: future versions may raise an error if `self.encoding`
and `self.destination.encoding` are set to different values.
"""
if not self.opened:
self.open()
if (isinstance(data, str)
and check_encoding(self.destination, self.encoding) is False):
if os.linesep != '\n':
data = data.replace('\n', os.linesep) # fix endings
data = self.encode(data)
try:
self.destination.write(data)
except TypeError as err:
if isinstance(data, bytes):
try:
self.destination.buffer.write(data)
except AttributeError:
if check_encoding(self.destination,
self.encoding) is False:
raise ValueError(
f'Encoding of {self.destination_path} '
f'({self.destination.encoding}) differs \n'
f' from specified encoding ({self.encoding})')
else:
raise err
except (UnicodeError, LookupError) as err:
raise UnicodeError(
'Unable to encode output data. output-encoding is: '
f'{self.encoding}.\n({error_string(err)})')
finally:
if self.autoclose:
self.close()
return data
def close(self) -> None:
if self.destination not in (sys.stdout, sys.stderr):
self.destination.close()
self.opened = False
class BinaryFileOutput(FileOutput):
"""
A version of docutils.io.FileOutput which writes to a binary file.
Deprecated. Use `FileOutput` (works with `bytes` since Docutils 0.20).
Will be removed in Docutils 0.24.
"""
# Used by core.publish_cmdline_to_binary() which is also deprecated.
mode = 'wb'
def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn('"BinaryFileOutput" is obsoleted by "FileOutput"'
' and will be removed in Docutils 0.24.',
DeprecationWarning, stacklevel=2)
super().__init__(*args, **kwargs)
class StringInput(Input):
"""Input from a `str` or `bytes` instance."""
source: str | bytes
default_source_path: Final = '<string>'
def read(self) -> str:
"""Return the source as `str` instance.
Decode, if required (see `Input.decode`).
"""
return self.decode(self.source)
class StringOutput(Output):
"""Output to a `bytes` or `str` instance.
Provisional.
"""
destination: str | bytes
default_destination_path: Final = '<string>'
def write(self, data: str | bytes) -> str | bytes:
"""Store `data` in `self.destination`, and return it.
If `self.encoding` is set to the pseudo encoding name "unicode",
`data` must be a `str` instance and is stored/returned unchanged
(cf. `Output.encode`).
Otherwise, `data` can be a `bytes` or `str` instance and is
stored/returned as a `bytes` instance
(`str` data is encoded with `self.encode()`).
Attention: the `output_encoding`_ setting may affect the content
of the output (e.g. an encoding declaration in HTML or XML or the
representation of characters as LaTeX macro vs. literal character).
"""
self.destination = self.encode(data)
return self.destination
class NullInput(Input):
"""Degenerate input: read nothing."""
source: None
default_source_path: Final = 'null input'
def read(self) -> str:
"""Return an empty string."""
return ''
class NullOutput(Output):
"""Degenerate output: write nothing."""
destination: None
default_destination_path: Final = 'null output'
def write(self, data: str | bytes) -> None:
"""Do nothing, return None."""
class DocTreeInput(Input):
"""
Adapter for document tree input.
The document tree must be passed in the ``source`` parameter.
"""
source: nodes.document
default_source_path: Final = 'doctree input'
def read(self) -> nodes.document:
"""Return the document tree."""
return self.source