Skip to main content
Glama

propublica-mcp

argument.py54.9 kB
import inspect import itertools import sys from contextlib import suppress from functools import partial from typing import Any, Callable, Literal, Optional, Sequence, Union, get_args, get_origin from attrs import define, field from cyclopts._convert import ( ITERABLE_TYPES, convert, token_count, ) from cyclopts.annotations import ( contains_hint, is_attrs, is_dataclass, is_namedtuple, is_nonetype, is_pydantic, is_typeddict, is_union, resolve, resolve_annotated, resolve_optional, ) from cyclopts.exceptions import ( CoercionError, CycloptsError, MissingArgumentError, MixedArgumentError, RepeatArgumentError, ValidationError, ) from cyclopts.field_info import ( KEYWORD_ONLY, POSITIONAL_ONLY, POSITIONAL_OR_KEYWORD, VAR_KEYWORD, VAR_POSITIONAL, FieldInfo, _attrs_field_infos, _generic_class_field_infos, _pydantic_field_infos, _typed_dict_field_infos, get_field_infos, signature_parameters, ) from cyclopts.group import Group from cyclopts.parameter import ITERATIVE_BOOL_IMPLICIT_VALUE, Parameter, get_parameters from cyclopts.token import Token from cyclopts.utils import UNSET, grouper, is_builtin # parameter subkeys should not inherit these parameter values from their parent. _PARAMETER_SUBKEY_BLOCKER = Parameter( name=None, converter=None, # pyright: ignore validator=None, accepts_keys=None, consume_multiple=None, env_var=None, ) _kind_parent_child_reassignment = { (POSITIONAL_OR_KEYWORD, POSITIONAL_OR_KEYWORD): POSITIONAL_OR_KEYWORD, (POSITIONAL_OR_KEYWORD, POSITIONAL_ONLY): POSITIONAL_ONLY, (POSITIONAL_OR_KEYWORD, KEYWORD_ONLY): KEYWORD_ONLY, (POSITIONAL_OR_KEYWORD, VAR_POSITIONAL): VAR_POSITIONAL, (POSITIONAL_OR_KEYWORD, VAR_KEYWORD): VAR_KEYWORD, (POSITIONAL_ONLY, POSITIONAL_OR_KEYWORD): POSITIONAL_ONLY, (POSITIONAL_ONLY, POSITIONAL_ONLY): POSITIONAL_ONLY, (POSITIONAL_ONLY, KEYWORD_ONLY): None, (POSITIONAL_ONLY, VAR_POSITIONAL): VAR_POSITIONAL, (POSITIONAL_ONLY, VAR_KEYWORD): None, (KEYWORD_ONLY, POSITIONAL_OR_KEYWORD): KEYWORD_ONLY, (KEYWORD_ONLY, POSITIONAL_ONLY): None, (KEYWORD_ONLY, KEYWORD_ONLY): KEYWORD_ONLY, (KEYWORD_ONLY, VAR_POSITIONAL): None, (KEYWORD_ONLY, VAR_KEYWORD): VAR_KEYWORD, (VAR_POSITIONAL, POSITIONAL_OR_KEYWORD): POSITIONAL_ONLY, (VAR_POSITIONAL, POSITIONAL_ONLY): POSITIONAL_ONLY, (VAR_POSITIONAL, KEYWORD_ONLY): None, (VAR_POSITIONAL, VAR_POSITIONAL): VAR_POSITIONAL, (VAR_POSITIONAL, VAR_KEYWORD): None, (VAR_KEYWORD, POSITIONAL_OR_KEYWORD): KEYWORD_ONLY, (VAR_KEYWORD, POSITIONAL_ONLY): None, (VAR_KEYWORD, KEYWORD_ONLY): KEYWORD_ONLY, (VAR_KEYWORD, VAR_POSITIONAL): None, (VAR_KEYWORD, VAR_KEYWORD): VAR_KEYWORD, } def _startswith(string, prefix): def normalize(s): return s.replace("_", "-") return normalize(string).startswith(normalize(prefix)) def _missing_keys_factory(get_field_info: Callable[[Any], dict[str, FieldInfo]]): def inner(argument: "Argument", data: dict[str, Any]) -> list[str]: provided_keys = set(data) field_info = get_field_info(argument.hint) return [k for k, v in field_info.items() if (v.required and k not in provided_keys)] return inner def _identity_converter(type_, token): return token def _get_annotated_discriminator(annotation): for meta in get_args(annotation)[1:]: try: return meta.discriminator except AttributeError: pass return None class ArgumentCollection(list["Argument"]): """A list-like container for :class:`Argument`.""" def __init__(self, *args): super().__init__(*args) def copy(self) -> "ArgumentCollection": """Returns a shallow copy of the :class:`ArgumentCollection`.""" return type(self)(self) def match( self, term: Union[str, int], *, transform: Optional[Callable[[str], str]] = None, delimiter: str = ".", ) -> tuple["Argument", tuple[str, ...], Any]: """Matches CLI keyword or index to their :class:`Argument`. Parameters ---------- term: str | int One of: * :obj:`str` keyword like ``"--foo"`` or ``"-f"`` or ``"--foo.bar.baz"``. * :obj:`int` global positional index. Raises ------ ValueError If the provided ``term`` doesn't match. Returns ------- Argument Matched :class:`Argument`. Tuple[str, ...] Python keys into Argument. Non-empty iff Argument accepts keys. Any Implicit value (if a flag). :obj:`~.UNSET` otherwise. """ best_match_argument, best_match_keys, best_implicit_value = None, None, UNSET for argument in self: try: match_keys, implicit_value = argument.match(term, transform=transform, delimiter=delimiter) except ValueError: continue if best_match_keys is None or len(match_keys) < len(best_match_keys): best_match_keys = match_keys best_match_argument = argument best_implicit_value = implicit_value if not match_keys: # Perfect match break if best_match_argument is None or best_match_keys is None: raise ValueError(f"No Argument matches {term!r}") return best_match_argument, best_match_keys, best_implicit_value def _set_marks(self, val: bool): for argument in self: argument._marked = val def _convert(self): """Convert and validate all elements.""" self._set_marks(False) for argument in sorted(self, key=lambda x: x.keys): if argument._marked: continue argument.convert_and_validate() @classmethod def _from_type( cls, field_info: FieldInfo, keys: tuple[str, ...], *default_parameters: Optional[Parameter], group_lookup: dict[str, Group], group_arguments: Group, group_parameters: Group, parse_docstring: bool = True, docstring_lookup: Optional[dict[tuple[str, ...], Parameter]] = None, positional_index: Optional[int] = None, _resolve_groups: bool = True, ): out = cls() # groups=list(group_lookup.values())) if docstring_lookup is None: docstring_lookup = {} cyclopts_parameters_no_group = [] hint = field_info.hint hint, hint_parameters = get_parameters(hint) cyclopts_parameters_no_group.extend(hint_parameters) if not keys: # root hint annotation if field_info.kind is field_info.VAR_KEYWORD: hint = dict[str, hint] elif field_info.kind is field_info.VAR_POSITIONAL: hint = tuple[hint, ...] if _resolve_groups: cyclopts_parameters = [] for cparam in cyclopts_parameters_no_group: resolved_groups = [] for group in cparam.group: # pyright:ignore if isinstance(group, str): group = group_lookup[group] resolved_groups.append(group) cyclopts_parameters.append(group.default_parameter) cyclopts_parameters.append(cparam) if resolved_groups: cyclopts_parameters.append(Parameter(group=resolved_groups)) else: cyclopts_parameters = cyclopts_parameters_no_group upstream_parameter = Parameter.combine( ( Parameter(group=group_arguments) if field_info.kind in (field_info.POSITIONAL_ONLY, field_info.VAR_POSITIONAL) else Parameter(group=group_parameters) ), *default_parameters, ) immediate_parameter = Parameter.combine(*cyclopts_parameters) # We do NOT want to skip parse=False arguments here. # This makes it easier to assemble ignored arguments downstream. # resolve/derive the parameter name if keys: cparam = Parameter.combine( upstream_parameter, _PARAMETER_SUBKEY_BLOCKER, immediate_parameter, ) cparam = Parameter.combine( cparam, Parameter( name=_resolve_parameter_name( upstream_parameter.name, # pyright: ignore immediate_parameter.name or tuple(cparam.name_transform(x) for x in field_info.names), # pyright: ignore ) ), ) else: # This is directly on iparam cparam = Parameter.combine( upstream_parameter, immediate_parameter, ) assert isinstance(cparam.alias, tuple) if cparam.name: if field_info.is_keyword: assert isinstance(cparam.name, tuple) cparam = Parameter.combine( cparam, Parameter(name=_resolve_parameter_name(cparam.name + cparam.alias)) ) else: if field_info.kind in (field_info.POSITIONAL_ONLY, field_info.VAR_POSITIONAL): # Name is only used for help-string cparam = Parameter.combine(cparam, Parameter(name=(name.upper() for name in field_info.names))) elif field_info.kind is field_info.VAR_KEYWORD: cparam = Parameter.combine(cparam, Parameter(name=("--[KEYWORD]",))) else: # cparam.name_transform cannot be None due to: # attrs.converters.default_if_none(default_name_transform) assert cparam.name_transform is not None cparam = Parameter.combine( cparam, Parameter( name=tuple("--" + cparam.name_transform(name) for name in field_info.names) + _resolve_parameter_name(cparam.alias) ), ) if field_info.is_keyword_only: positional_index = None argument = Argument(field_info=field_info, parameter=cparam, keys=keys, hint=hint) if not argument._accepts_keywords and positional_index is not None: argument.index = positional_index positional_index += 1 out.append(argument) if argument._accepts_keywords: hint_docstring_lookup = _extract_docstring_help(argument.hint) if parse_docstring else {} hint_docstring_lookup.update(docstring_lookup) for sub_field_name, sub_field_info in argument._lookup.items(): updated_kind = _kind_parent_child_reassignment[(argument.field_info.kind, sub_field_info.kind)] # pyright: ignore if updated_kind is None: continue sub_field_info.kind = updated_kind if sub_field_info.is_keyword_only: positional_index = None subkey_docstring_lookup = { k[1:]: v for k, v in hint_docstring_lookup.items() if k[0] == sub_field_name and len(k) > 1 } subkey_argument_collection = cls._from_type( sub_field_info, keys + (sub_field_name,), cparam, ( Parameter(help=sub_field_info.help) if sub_field_info.help else hint_docstring_lookup.get((sub_field_name,)) ), Parameter(required=argument.required & sub_field_info.required), group_lookup=group_lookup, group_arguments=group_arguments, group_parameters=group_parameters, parse_docstring=parse_docstring, docstring_lookup=subkey_docstring_lookup, positional_index=positional_index, _resolve_groups=_resolve_groups, ) if subkey_argument_collection: argument.children.append(subkey_argument_collection[0]) out.extend(subkey_argument_collection) if positional_index is not None: positional_index = subkey_argument_collection._max_index if positional_index is not None: positional_index += 1 return out @classmethod def _from_callable( cls, func: Callable, *default_parameters: Optional[Parameter], group_lookup: Optional[dict[str, Group]] = None, group_arguments: Optional[Group] = None, group_parameters: Optional[Group] = None, parse_docstring: bool = True, _resolve_groups: bool = True, ): out = cls() if group_arguments is None: group_arguments = Group.create_default_arguments() if group_parameters is None: group_parameters = Group.create_default_parameters() if _resolve_groups: group_lookup = { group.name: group for group in _resolve_groups_from_callable( func, *default_parameters, group_arguments=group_arguments, group_parameters=group_parameters, ) } else: group_lookup = {} docstring_lookup = _extract_docstring_help(func) if parse_docstring else {} positional_index = 0 for field_info in signature_parameters(func).values(): if parse_docstring: subkey_docstring_lookup = { k[1:]: v for k, v in docstring_lookup.items() if k[0] == field_info.name and len(k) > 1 } else: subkey_docstring_lookup = None iparam_argument_collection = cls._from_type( field_info, (), *default_parameters, Parameter(help=field_info.help) if field_info.help else docstring_lookup.get((field_info.name,)), group_lookup=group_lookup, group_arguments=group_arguments, group_parameters=group_parameters, positional_index=positional_index, parse_docstring=parse_docstring, docstring_lookup=subkey_docstring_lookup, _resolve_groups=_resolve_groups, ) if positional_index is not None: positional_index = iparam_argument_collection._max_index if positional_index is not None: positional_index += 1 out.extend(iparam_argument_collection) return out @property def groups(self): groups = [] for argument in self: assert isinstance(argument.parameter.group, tuple) for group in argument.parameter.group: if group not in groups: groups.append(group) return groups @property def _root_arguments(self): for argument in self: if not argument.keys: yield argument @property def _max_index(self) -> Optional[int]: return max((x.index for x in self if x.index is not None), default=None) def filter_by( self, *, group: Optional[Group] = None, has_tokens: Optional[bool] = None, has_tree_tokens: Optional[bool] = None, keys_prefix: Optional[tuple[str, ...]] = None, kind: Optional[inspect._ParameterKind] = None, parse: Optional[bool] = None, show: Optional[bool] = None, value_set: Optional[bool] = None, ) -> "ArgumentCollection": """Filter the :class:`ArgumentCollection`. All non-:obj:`None` filters will be applied. Parameters ---------- group: Optional[Group] The :class:`.Group` the arguments should be in. has_tokens: Optional[bool] Immediately has tokens (not including children). has_tree_tokens: Optional[bool] Argument and/or it's children have parsed tokens. kind: Optional[inspect._ParameterKind] The :attr:`~inspect.Parameter.kind` of the argument. parse: Optional[bool] If the argument is intended to be parsed or not. show: Optional[bool] The Argument is intended to be show on the help page. value_set: Optional[bool] The converted value is set. """ ac = self.copy() cls = type(self) if group is not None: ac = cls(x for x in ac if group in x.parameter.group) # pyright: ignore if kind is not None: ac = cls(x for x in ac if x.field_info.kind == kind) if has_tokens is not None: ac = cls(x for x in ac if not (bool(x.tokens) ^ bool(has_tokens))) if has_tree_tokens is not None: ac = cls(x for x in ac if not (bool(x.tokens) ^ bool(has_tree_tokens))) if keys_prefix is not None: ac = cls(x for x in ac if x.keys[: len(keys_prefix)] == keys_prefix) if show is not None: ac = cls(x for x in ac if not (x.show ^ bool(show))) if value_set is not None: ac = cls(x for x in ac if ((x.value is UNSET) ^ bool(value_set))) if parse is not None: ac = cls(x for x in ac if not (x.parameter.parse ^ parse)) return ac @define(kw_only=True) class Argument: """Encapsulates functionality and additional contextual information for parsing a parameter. An argument is defined as anything that would have its own entry in the help page. """ tokens: list[Token] = field(factory=list) """ List of :class:`.Token` parsed from various sources. Do not directly mutate; see :meth:`append`. """ field_info: FieldInfo = field(factory=FieldInfo) """ Additional information about the parameter from surrounding python syntax. """ parameter: Parameter = field(factory=Parameter) # pyright: ignore """ Fully resolved user-provided :class:`.Parameter`. """ hint: Any = field(default=str, converter=resolve) """ The type hint for this argument; may be different from :attr:`.FieldInfo.annotation`. """ index: Optional[int] = field(default=None) """ Associated python positional index for argument. If ``None``, then cannot be assigned positionally. """ keys: tuple[str, ...] = field(default=()) """ **Python** keys that lead to this leaf. ``self.parameter.name`` and ``self.keys`` can naively disagree! For example, a ``self.parameter.name="--foo.bar.baz"`` could be aliased to "--fizz". The resulting ``self.keys`` would be ``("bar", "baz")``. This is populated based on type-hints and class-structure, not ``Parameter.name``. .. code-block:: python from cyclopts import App, Parameter from dataclasses import dataclass from typing import Annotated app = App() @dataclass class User: id: int name: Annotated[str, Parameter(name="--fullname")] @app.default def main(user: User): pass for argument in app.assemble_argument_collection(): print(f"name: {argument.name:16} hint: {str(argument.hint):16} keys: {str(argument.keys)}") .. code-block:: bash $ my-script name: --user.id hint: <class 'int'> keys: ('id',) name: --fullname hint: <class 'str'> keys: ('name',) """ # Converted value; may be stale. _value: Any = field(alias="value", default=UNSET) """ Converted value from last :meth:`convert` call. This value may be stale if fields have changed since last :meth:`convert` call. :class:`.UNSET` if :meth:`convert` has not yet been called with tokens. """ _accepts_keywords: bool = field(default=False, init=False, repr=False) _default: Any = field(default=None, init=False, repr=False) _lookup: dict[str, FieldInfo] = field(factory=dict, init=False, repr=False) children: "ArgumentCollection" = field(factory=ArgumentCollection, init=False, repr=False) """ Collection of other :class:`Argument` that eventually culminate into the python variable represented by :attr:`field_info`. """ _marked_converted: bool = field(default=False, init=False, repr=False) # for mark & sweep algos _mark_converted_override: bool = field(default=False, init=False, repr=False) # Validator to be called based on builtin type support. _missing_keys_checker: Optional[Callable] = field(default=None, init=False, repr=False) _internal_converter: Optional[Callable] = field(default=None, init=False, repr=False) def __attrs_post_init__(self): # By definition, self.hint is Not AnnotatedType hint = resolve(self.hint) hints = get_args(hint) if is_union(hint) else (hint,) if not self.parameter.parse: return if self.parameter.accepts_keys is False: # ``None`` means to infer. return for hint in hints: # ``self.parameter.accepts_keys`` is either ``None`` or ``True`` here origin = get_origin(hint) hint_origin = {hint, origin} # Classes that ALWAYS takes keywords (accepts_keys=None) field_infos = get_field_infos(hint) if dict in hint_origin: self._accepts_keywords = True key_type, val_type = str, str args = get_args(hint) with suppress(IndexError): key_type = args[0] val_type = args[1] if key_type is not str: raise TypeError('Dictionary type annotations must have "str" keys.') self._default = val_type elif is_typeddict(hint): self._missing_keys_checker = _missing_keys_factory(_typed_dict_field_infos) self._accepts_keywords = True self._update_lookup(field_infos) elif is_dataclass(hint): # Typical usecase of a dataclass will have more than 1 field. self._missing_keys_checker = _missing_keys_factory(_generic_class_field_infos) self._accepts_keywords = True self._update_lookup(field_infos) elif is_namedtuple(hint): # collections.namedtuple does not have type hints, assume "str" for everything. self._missing_keys_checker = _missing_keys_factory(_generic_class_field_infos) self._accepts_keywords = True if not hasattr(hint, "__annotations__"): raise ValueError("Cyclopts cannot handle collections.namedtuple in python <3.10.") self._update_lookup(field_infos) elif is_attrs(hint): self._missing_keys_checker = _missing_keys_factory(_attrs_field_infos) self._accepts_keywords = True self._update_lookup(field_infos) elif is_pydantic(hint): self._missing_keys_checker = _missing_keys_factory(_pydantic_field_infos) self._accepts_keywords = True # pydantic's __init__ signature doesn't accurately reflect its requirements. # so we cannot use _generic_class_required_optional(...) self._update_lookup(field_infos) elif not is_builtin(hint) and field_infos: # Some classic user class. self._missing_keys_checker = _missing_keys_factory(_generic_class_field_infos) self._accepts_keywords = True self._update_lookup(field_infos) elif self.parameter.accepts_keys is None: # Typical builtin hint continue if self.parameter.accepts_keys is None: continue # Only explicit ``self.parameter.accepts_keys == True`` from here on # Classes that MAY take keywords (accepts_keys=True) # They must be explicitly specified ``accepts_keys=True`` because otherwise # providing a single positional argument is what we want. self._accepts_keywords = True self._missing_keys_checker = _missing_keys_factory(_generic_class_field_infos) for i, field_info in enumerate(signature_parameters(hint.__init__).values()): if i == 0 and field_info.name == "self": continue if field_info.kind is field_info.VAR_KEYWORD: self._default = field_info.annotation else: self._update_lookup({field_info.name: field_info}) def _update_lookup(self, field_infos: dict[str, FieldInfo]): discriminator = _get_annotated_discriminator(self.field_info.annotation) for key, field_info in field_infos.items(): if existing_field_info := self._lookup.get(key): if existing_field_info == field_info: pass elif discriminator and discriminator in field_info.names and discriminator in existing_field_info.names: existing_field_info.annotation = Literal[existing_field_info.annotation, field_info.annotation] # pyright: ignore existing_field_info.default = FieldInfo.empty else: raise NotImplementedError else: self._lookup[key] = field_info @property def value(self): """Converted value from last :meth:`convert` call. This value may be stale if fields have changed since last :meth:`convert` call. :class:`.UNSET` if :meth:`convert` has not yet been called with tokens. """ return self._value @value.setter def value(self, val): if self._marked: self._mark_converted_override = True self._marked = True self._value = val @property def _marked(self): """If ``True``, then this node in the tree has already been converted and ``value`` has been populated.""" return self._marked_converted | self._mark_converted_override @_marked.setter def _marked(self, value: bool): self._marked_converted = value @property def _accepts_arbitrary_keywords(self) -> bool: args = get_args(self.hint) if is_union(self.hint) else (self.hint,) return any(dict in (arg, get_origin(arg)) for arg in args) @property def show_default(self) -> Union[bool, Callable[[Any], str]]: """Show the default value on the help page.""" if self.required: # By definition, a required parameter cannot have a default. return False elif self.parameter.show_default is None: # Showing a default ``None`` value is typically not helpful to the end-user. return self.field_info.default not in (None, self.field_info.empty) elif (self.field_info.default is self.field_info.empty) or not self.parameter.show_default: return False else: return self.parameter.show_default @property def _use_pydantic_type_adapter(self) -> bool: return bool( is_pydantic(self.hint) or ( is_union(self.hint) and ( any(is_pydantic(x) for x in get_args(self.hint)) or _get_annotated_discriminator(self.field_info.annotation) ) ) ) def _type_hint_for_key(self, key: str): try: return self._lookup[key].annotation except KeyError: if self._default is None: raise return self._default def _should_attempt_json_dict(self, tokens: Optional[Sequence[Union[Token, str]]] = None) -> bool: """When parsing, should attempt to parse the token(s) as json dict data.""" if tokens is None: tokens = self.tokens if not tokens: return False if not self._accepts_keywords: return False value = tokens[0].value if isinstance(tokens[0], Token) else tokens[0] if not value.strip().startswith("{"): return False if self.parameter.json_dict is not None: return self.parameter.json_dict if contains_hint(self.field_info.annotation, str): return False return True def _should_attempt_json_list( self, tokens: Union[Sequence[Union[Token, str]], Token, str, None] = None, keys: tuple[str, ...] = () ) -> bool: """When parsing, should attempt to parse the token(s) as json list data.""" if tokens is None: tokens = self.tokens if not tokens: return False _, consume_all = self.token_count(keys) if not consume_all: return False if isinstance(tokens, Token): value = tokens.value elif isinstance(tokens, str): value = tokens else: value = tokens[0].value if isinstance(tokens[0], Token) else tokens[0] if not value.strip().startswith("["): return False if self.parameter.json_list is not None: return self.parameter.json_list for arg in get_args(self.field_info.annotation) or (str,): if contains_hint(arg, str): return False return True def match( self, term: Union[str, int], *, transform: Optional[Callable[[str], str]] = None, delimiter: str = ".", ) -> tuple[tuple[str, ...], Any]: """Match a name search-term, or a positional integer index. Raises ------ ValueError If no match is found. Returns ------- Tuple[str, ...] Leftover keys after matching to this argument. Used if this argument accepts_arbitrary_keywords. Any Implicit value. :obj:`~.UNSET` if no implicit value is applicable. """ if not self.parameter.parse: raise ValueError return ( self._match_index(term) if isinstance(term, int) else self._match_name(term, transform=transform, delimiter=delimiter) ) def _match_name( self, term: str, *, transform: Optional[Callable[[str], str]] = None, delimiter: str = ".", ) -> tuple[tuple[str, ...], Any]: """Check how well this argument matches a token keyword identifier. Parameters ---------- term: str Something like "--foo" transform: Callable Function that converts the cyclopts Parameter name(s) into something that should be compared against ``term``. Raises ------ ValueError If no match found. Returns ------- Tuple[str, ...] Leftover keys after matching to this argument. Used if this argument accepts_arbitrary_keywords. Any Implicit value. """ if self.field_info.kind is self.field_info.VAR_KEYWORD: return tuple(term.lstrip("-").split(delimiter)), UNSET trailing = term implicit_value = UNSET assert self.parameter.name for name in self.parameter.name: if transform: name = transform(name) if _startswith(term, name): trailing = term[len(name) :] implicit_value = True if self.hint is bool or self.hint in ITERATIVE_BOOL_IMPLICIT_VALUE else UNSET if trailing: if trailing[0] == delimiter: trailing = trailing[1:] break # Otherwise, it's not an actual match. else: # exact match return (), implicit_value else: # No positive-name matches found. hint = resolve_annotated(self.field_info.annotation) if is_union(hint): hints = get_args(hint) else: hints = (hint,) for hint in hints: hint = resolve_annotated(hint) double_break = False for name in self.parameter.get_negatives(hint): if transform: name = transform(name) if term.startswith(name): trailing = term[len(name) :] if hint in ITERATIVE_BOOL_IMPLICIT_VALUE: implicit_value = False elif is_nonetype(hint) or hint is None: implicit_value = None else: hint = resolve_optional(hint) implicit_value = (get_origin(hint) or hint)() # pyright: ignore[reportAbstractUsage] if trailing: if trailing[0] == delimiter: trailing = trailing[1:] double_break = True break # Otherwise, it's not an actual match. else: # exact match return (), implicit_value if double_break: break else: # No negative-name matches found. raise ValueError if not self._accepts_arbitrary_keywords: # Still not an actual match. raise ValueError return tuple(trailing.split(delimiter)), implicit_value def _match_index(self, index: int) -> tuple[tuple[str, ...], Any]: if self.index is None: raise ValueError elif self.field_info.kind is self.field_info.VAR_POSITIONAL: if index < self.index: raise ValueError elif index != self.index: raise ValueError return (), UNSET def append(self, token: Token): """Safely add a :class:`Token`.""" if not self.parameter.parse: raise ValueError if any(x.address == token.address for x in self.tokens): _, consume_all = self.token_count(token.keys) if not consume_all: raise RepeatArgumentError(token=token) if self.tokens: if bool(token.keys) ^ any(x.keys for x in self.tokens): raise MixedArgumentError(argument=self) self.tokens.append(token) @property def has_tokens(self) -> bool: """This argument, or a child argument, has at least 1 parsed token.""" # noqa: D404 return bool(self.tokens) or any(x.has_tokens for x in self.children) @property def children_recursive(self) -> "ArgumentCollection": out = ArgumentCollection() for child in self.children: out.append(child) out.extend(child.children_recursive) return out def _convert_pydantic(self): if self.has_tokens: import pydantic unstructured_data = self._json() try: # This inherently also invokes pydantic validators return pydantic.TypeAdapter(self.field_info.annotation).validate_python(unstructured_data) except pydantic.ValidationError as e: self._handle_pydantic_validation_error(e) else: return UNSET def _convert(self, converter: Optional[Callable] = None): if self.parameter.converter: converter = self.parameter.converter elif converter is None: converter = partial(convert, name_transform=self.parameter.name_transform) def safe_converter(hint, tokens): if isinstance(tokens, dict): try: return converter(hint, tokens) # pyright: ignore except (AssertionError, ValueError, TypeError) as e: raise CoercionError(msg=e.args[0] if e.args else None, argument=self, target_type=hint) from e else: try: return converter(hint, tokens) # pyright: ignore except (AssertionError, ValueError, TypeError) as e: token = tokens[0] if len(tokens) == 1 else None raise CoercionError( msg=e.args[0] if e.args else None, argument=self, target_type=hint, token=token ) from e if not self.parameter.parse: out = UNSET elif not self.children: positional: list[Token] = [] keyword = {} def expand_tokens(tokens): for token in tokens: if self._should_attempt_json_list(token): import json try: parsed_json = json.loads(token.value) except json.JSONDecodeError as e: raise CoercionError(token=token, target_type=self.hint) from e if not isinstance(parsed_json, list): raise CoercionError(token=token, target_type=self.hint) for element in parsed_json: if element is None: yield token.evolve(value="", implicit_value=element) else: yield token.evolve(value=str(element)) else: yield token expanded_tokens = list(expand_tokens(self.tokens)) for token in expanded_tokens: if token.implicit_value is not UNSET and isinstance( token.implicit_value, get_origin(self.hint) or self.hint ): assert len(expanded_tokens) == 1 return token.implicit_value if token.keys: lookup = keyword for key in token.keys[:-1]: lookup = lookup.setdefault(key, {}) lookup.setdefault(token.keys[-1], []).append(token) else: positional.append(token) if positional and keyword: # pragma: no cover # This should never happen due to checks in ``Argument.append`` raise MixedArgumentError(argument=self) if positional: if self.field_info and self.field_info.kind is self.field_info.VAR_POSITIONAL: # Apply converter to individual values hint = get_args(self.hint)[0] tokens_per_element, _ = self.token_count() out = tuple(safe_converter(hint, values) for values in grouper(positional, tokens_per_element)) else: out = safe_converter(self.hint, tuple(positional)) elif keyword: if self.field_info and self.field_info.kind is self.field_info.VAR_KEYWORD and not self.keys: # Apply converter to individual values out = {key: safe_converter(get_args(self.hint)[1], value) for key, value in keyword.items()} else: out = safe_converter(self.hint, keyword) elif self.required: raise MissingArgumentError(argument=self) else: # no tokens return UNSET else: # A dictionary-like structure. data = {} if self._should_attempt_json_dict(): # Dict-like structures may have incoming json data from an environment variable. # Pass these values along as Tokens to children. import json from cyclopts.config._common import update_argument_collection while self.tokens: token = self.tokens.pop(0) try: parsed_json = json.loads(token.value) except json.JSONDecodeError as e: raise CoercionError(token=token, target_type=self.hint) from e update_argument_collection( {self.name.lstrip("-"): parsed_json}, token.source, self.children_recursive, root_keys=(), allow_unknown=False, ) if self._use_pydantic_type_adapter: return self._convert_pydantic() for child in self.children: assert len(child.keys) == (len(self.keys) + 1) if child.has_tokens: # Either the child directly has tokens, or a nested child has tokens. data[child.keys[-1]] = child.convert_and_validate(converter=converter) elif child.required: # Check if the required fields are already populated. obj = data for k in child.keys: try: obj = obj[k] except Exception: raise MissingArgumentError(argument=child) from None child._marked = True self._run_missing_keys_checker(data) if data: out = self.hint(**data) elif self.required: # This should NEVER happen: empty data to a required dict field. raise MissingArgumentError(argument=self) # pragma: no cover else: out = UNSET return out def convert(self, converter: Optional[Callable] = None): """Converts :attr:`tokens` into :attr:`value`. Parameters ---------- converter: Optional[Callable] Converter function to use. Overrides ``self.parameter.converter`` Returns ------- Any The converted data. Same as :attr:`value`. """ if not self._marked: try: self.value = self._convert(converter=converter) except CoercionError as e: if e.argument is None: e.argument = self if e.target_type is None: e.target_type = self.hint raise except CycloptsError as e: if e.argument is None: e.argument = self raise return self.value def validate(self, value): """Validates provided value. Parameters ---------- value: Value to validate. Returns ------- Any The converted data. Same as :attr:`value`. """ assert isinstance(self.parameter.validator, tuple) if "pydantic" in sys.modules: import pydantic pydantic_version = tuple(int(x) for x in pydantic.__version__.split(".")) if pydantic_version < (2,): # Cyclopts does NOT support/use pydantic v1. pydantic = None else: pydantic = None def validate_pydantic(hint, val): if not pydantic: return if self._use_pydantic_type_adapter: # Pydantic already called the validators return try: pydantic.TypeAdapter(hint).validate_python(val) except pydantic.ValidationError as e: self._handle_pydantic_validation_error(e) except pydantic.PydanticUserError: # Pydantic couldn't generate a schema for this type hint. pass try: if not self.keys and self.field_info and self.field_info.kind is self.field_info.VAR_KEYWORD: hint = get_args(self.hint)[1] for validator in self.parameter.validator: for val in value.values(): validator(hint, val) validate_pydantic(dict[str, self.field_info.annotation], value) elif self.field_info and self.field_info.kind is self.field_info.VAR_POSITIONAL: hint = get_args(self.hint)[0] for validator in self.parameter.validator: for val in value: validator(hint, val) validate_pydantic(tuple[self.field_info.annotation, ...], value) else: for validator in self.parameter.validator: validator(self.hint, value) validate_pydantic(self.field_info.annotation, value) except (AssertionError, ValueError, TypeError) as e: raise ValidationError(exception_message=e.args[0] if e.args else "", argument=self) from e def convert_and_validate(self, converter: Optional[Callable] = None): """Converts and validates :attr:`tokens` into :attr:`value`. Parameters ---------- converter: Optional[Callable] Converter function to use. Overrides ``self.parameter.converter`` Returns ------- Any The converted data. Same as :attr:`value`. """ val = self.convert(converter=converter) if val is not UNSET: self.validate(val) elif self.field_info.default is not FieldInfo.empty: self.validate(self.field_info.default) return val def token_count(self, keys: tuple[str, ...] = ()): """The number of string tokens this argument consumes. Parameters ---------- keys: tuple[str, ...] The **python** keys into this argument. If provided, returns the number of string tokens that specific data type within the argument consumes. Returns ------- int Number of string tokens to create 1 element. consume_all: bool :obj:`True` if this data type is iterable. """ if len(keys) > 1: hint = self._default elif len(keys) == 1: hint = self._type_hint_for_key(keys[0]) else: hint = self.hint tokens_per_element, consume_all = token_count(hint) return tokens_per_element, consume_all @property def negatives(self): """Negative flags from :meth:`.Parameter.get_negatives`.""" return self.parameter.get_negatives(resolve_annotated(self.field_info.annotation)) @property def name(self) -> str: """The **first** provided name this argument goes by.""" return self.names[0] @property def names(self) -> tuple[str, ...]: """Names the argument goes by (both positive and negative).""" assert isinstance(self.parameter.name, tuple) return tuple(itertools.chain(self.parameter.name, self.negatives)) def env_var_split(self, value: str, delimiter: Optional[str] = None) -> list[str]: """Split a given value with :meth:`.Parameter.env_var_split`.""" return self.parameter.env_var_split(self.hint, value, delimiter=delimiter) @property def show(self) -> bool: """Show this argument on the help page. If an argument has child arguments, don't show it on the help-page. """ return not self.children and self.parameter.show @property def required(self) -> bool: """Whether or not this argument requires a user-provided value.""" if self.parameter.required is None: return self.field_info.required else: return self.parameter.required def _json(self) -> dict: """Convert argument to be json-like for pydantic. All values will be str/list/dict. """ out = {} if self._accepts_keywords: for token in self.tokens: node = out for key in token.keys[:-1]: node = node.setdefault(key, {}) node[token.keys[-1]] = token.value if token.implicit_value is UNSET else token.implicit_value for child in self.children: child._marked = True if not child.has_tokens: continue keys = child.keys[len(self.keys) :] if child._accepts_keywords: result = child._json() if result: out[keys[0]] = result elif (get_origin(child.hint) or child.hint) in ITERABLE_TYPES: out.setdefault(keys[-1], []).extend([token.value for token in child.tokens]) else: token = child.tokens[0] out[keys[0]] = token.value if token.implicit_value is UNSET else token.implicit_value return out def _run_missing_keys_checker(self, data): if not self._missing_keys_checker or (not self.required and not data): return if not (missing_keys := self._missing_keys_checker(self, data)): return # Report the first missing argument. missing_key = missing_keys[0] keys = self.keys + (missing_key,) missing_arguments = self.children.filter_by(keys_prefix=keys) if missing_arguments: raise MissingArgumentError(argument=missing_arguments[0]) else: missing_description = self.field_info.names[0] + "->" + "->".join(keys) raise ValueError( f'Required field "{missing_description}" is not accessible by Cyclopts; possibly due to conflicting POSITIONAL/KEYWORD requirements.' ) def _handle_pydantic_validation_error(self, exc): import pydantic error = exc.errors()[0] if error["type"] == "missing": missing_argument = self.children_recursive.filter_by(keys_prefix=self.keys + error["loc"])[0] raise MissingArgumentError(argument=missing_argument) from exc elif isinstance(exc, pydantic.ValidationError): raise ValidationError(exception_message=str(exc), argument=self) from exc else: raise exc def _resolve_groups_from_callable( func: Callable[..., Any], *default_parameters: Optional[Parameter], group_arguments: Optional[Group] = None, group_parameters: Optional[Group] = None, ) -> list[Group]: argument_collection = ArgumentCollection._from_callable( func, *default_parameters, group_arguments=group_arguments, group_parameters=group_parameters, parse_docstring=False, _resolve_groups=False, ) resolved_groups = [] if group_arguments is not None: resolved_groups.append(group_arguments) if group_parameters is not None: resolved_groups.append(group_parameters) # Iteration 1: Collect all explicitly instantiated groups for argument in argument_collection: for group in argument.parameter.group: # pyright: ignore if not isinstance(group, Group): continue # Ensure a different, but same-named group doesn't already exist if any(group != x and x._name == group._name for x in resolved_groups): raise ValueError("Cannot register 2 distinct Group objects with same name.") if group.default_parameter is not None and group.default_parameter.group: # This shouldn't be possible due to ``Group`` internal checks. raise ValueError("Group.default_parameter cannot have a specified group.") # pragma: no cover # Add the group to resolved_groups if it hasn't been added yet. try: next(x for x in resolved_groups if x._name == group._name) except StopIteration: resolved_groups.append(group) # Iteration 2: Create all implicitly defined Group from strings. for argument in argument_collection: for group in argument.parameter.group: # pyright: ignore if not isinstance(group, str): continue try: next(x for x in resolved_groups if x.name == group) except StopIteration: resolved_groups.append(Group(group)) return resolved_groups def _extract_docstring_help(f: Callable) -> dict[tuple[str, ...], Parameter]: from docstring_parser import parse_from_object # Handle functools.partial with suppress(AttributeError): f = f.func # pyright: ignore[reportFunctionMemberAccess] try: return { tuple(dparam.arg_name.split(".")): Parameter(help=dparam.description) for dparam in parse_from_object(f).params } except TypeError: # Type hints like ``dict[str, str]`` trigger this. return {} def _resolve_parameter_name_helper(elem): if elem.endswith("*"): elem = elem[:-1].rstrip(".") if elem and not elem.startswith("-"): elem = "--" + elem return elem def _resolve_parameter_name(*argss: tuple[str, ...]) -> tuple[str, ...]: """Resolve parameter names by combining and formatting multiple tuples of strings. Parameters ---------- *argss Each tuple represents a group of parameter name components. Returns ------- tuple[str, ...] A tuple of resolved parameter names. """ argss = tuple(x for x in argss if x) if len(argss) == 0: return () elif len(argss) == 1: return tuple("*" if x == "*" else _resolve_parameter_name_helper(x) for x in argss[0]) # Combine the first 2, and do a recursive call. out = [] for a1 in argss[0]: a1 = _resolve_parameter_name_helper(a1) for a2 in argss[1]: if a2.startswith("-") or not a1: out.append(a2) else: out.append(a1 + "." + a2) return _resolve_parameter_name(tuple(out), *argss[2:])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/asachs01/propublica-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server