read_file
Read the contents of a file as UTF-8 text, with an option to limit the number of bytes retrieved.
Instructions
Read up to max_bytes bytes of the file at path as UTF-8 text.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| path | Yes | ||
| max_bytes | No |
Implementation Reference
- mcpforge/builtin/filesystem.py:46-60 (handler)The core handler for the read_file tool. Reads up to `max_bytes` bytes from a sandboxed file path as UTF-8 text, with truncation detection.
@tool def read_file(self, path: str, max_bytes: int = 100_000) -> str: """Read up to `max_bytes` bytes of the file at `path` as UTF-8 text.""" target = self._safe_join(path) if not target.exists(): raise FileNotFoundError(f"no such file: {path}") if not target.is_file(): raise IsADirectoryError(f"not a file: {path}") with target.open("rb") as f: data = f.read(max_bytes + 1) truncated = len(data) > max_bytes body = data[:max_bytes].decode("utf-8", errors="replace") if truncated: body += f"\n\n[truncated at {max_bytes} bytes]" return body - mcpforge/schema.py:139-175 (schema)function_to_input_schema generates JSON Schema input schemas from function signatures, used to produce the inputSchema for read_file's parameters (path: str, max_bytes: int).
def function_to_input_schema(func: typing.Callable[..., Any]) -> dict[str, Any]: """Build a JSON Schema "object" describing a function's parameters.""" sig = inspect.signature(func) try: hints = typing.get_type_hints(func) except Exception: hints = {} properties: dict[str, Any] = {} required: list[str] = [] for name, param in sig.parameters.items(): if name in ("self", "cls"): continue if param.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD): continue annotation = hints.get(name, param.annotation) prop_schema = python_type_to_json_schema(annotation) if param.default is not inspect.Parameter.empty: try: import json as _json _json.dumps(param.default) prop_schema = {**prop_schema, "default": param.default} except (TypeError, ValueError): pass else: required.append(name) properties[name] = prop_schema schema: dict[str, Any] = { "type": "object", "properties": properties, "additionalProperties": False, } if required: schema["required"] = required return schema - mcpforge/decorator.py:74-120 (registration)The @serve decorator that scans a class for @tool-decorated methods (like read_file) and registers them as Tool objects in ServerInfo.
def serve( *, name: str, version: str = "0.1.0", description: str = "", capabilities: Capability | None = None, ) -> Callable[[C], C]: """Decorate a class to declare it as an MCP server.""" def _wrap(cls: C) -> C: info = ServerInfo( name=name, version=version, description=description or (inspect.getdoc(cls) or "").strip().split("\n")[0], capabilities=capabilities or Capability(), ) for attr_name, member in inspect.getmembers(cls): if not callable(member): continue if hasattr(member, TOOL_ATTR): meta = getattr(member, TOOL_ATTR) input_schema = function_to_input_schema(member) info.add_tool( Tool( name=meta["name"], description=meta["description"], input_schema=input_schema, func=member, method_name=attr_name, ) ) if hasattr(member, RESOURCE_ATTR): meta = getattr(member, RESOURCE_ATTR) info.add_resource( Resource( uri=meta["uri"], name=meta["name"], description=meta["description"], mime_type=meta["mime_type"], func=member, ) ) setattr(cls, META_ATTR, info) return cls return _wrap - mcpforge/decorator.py:74-117 (registration)The @tool decorator (lines 38-50) that marks read_file as a tool by attaching metadata (name, description). The @serve decorator (lines 74-120) collects all @tool-marked methods and registers them.
def serve( *, name: str, version: str = "0.1.0", description: str = "", capabilities: Capability | None = None, ) -> Callable[[C], C]: """Decorate a class to declare it as an MCP server.""" def _wrap(cls: C) -> C: info = ServerInfo( name=name, version=version, description=description or (inspect.getdoc(cls) or "").strip().split("\n")[0], capabilities=capabilities or Capability(), ) for attr_name, member in inspect.getmembers(cls): if not callable(member): continue if hasattr(member, TOOL_ATTR): meta = getattr(member, TOOL_ATTR) input_schema = function_to_input_schema(member) info.add_tool( Tool( name=meta["name"], description=meta["description"], input_schema=input_schema, func=member, method_name=attr_name, ) ) if hasattr(member, RESOURCE_ATTR): meta = getattr(member, RESOURCE_ATTR) info.add_resource( Resource( uri=meta["uri"], name=meta["name"], description=meta["description"], mime_type=meta["mime_type"], func=member, ) ) setattr(cls, META_ATTR, info) - mcpforge/builtin/filesystem.py:21-30 (helper)_safe_join is a helper method that resolves paths relative to the sandbox root and prevents path traversal attacks, used by read_file to validate input.
def _safe_join(self, path: str) -> Path: """Resolve `path` under self.root and verify it stays inside.""" candidate = (self.root / path).resolve() try: candidate.relative_to(self.root) except ValueError: raise ValueError( f"path escapes sandbox root: {path!r} resolves outside {self.root}" ) return candidate