join
Match records from two pre-sorted files on a common field, returning JSON with joined data. Use after sorting for accurate results.
Instructions
Join two sorted files on a common field (default: first whitespace-separated field), performing an inner join. Read-only, no side effects. Requires pre-sorted input — use 'sort' first. Returns JSON with joined records. Use to combine related datasets by key. Not for unsorted input — results are wrong without prior sorting. Not for side-by-side merging without key matching — use 'paste'. See also 'paste', 'comm', 'sort'.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| delimiter | No | Input delimiter. Defaults to any whitespace. | |
| encoding | No | Text encoding. | utf-8 |
| field1 | No | 1-based join field for the first file. | |
| field2 | No | 1-based join field for the second file. | |
| max_lines | No | Maximum JSON records to emit. | |
| output_delimiter | No | Delimiter for output fields. | |
| paths | Yes | Two files to join. | |
| raw | No | Write joined text without a JSON envelope. |
Implementation Reference
- The core handler function for the 'join' tool. It reads two files, indexes the second by a join field, matches lines from the first file by key, and produces joined output records.
def command_join(args: argparse.Namespace) -> dict[str, Any] | bytes: if len(args.paths) != 2: raise AgentError("invalid_input", "join requires exactly two input files.") if args.field1 < 1 or args.field2 < 1: raise AgentError("invalid_input", "--field1 and --field2 are 1-based and must be positive.") left_lines, left_sources = combined_lines([args.paths[0]], encoding=args.encoding) right_lines, right_sources = combined_lines([args.paths[1]], encoding=args.encoding) right_index: dict[str, list[list[str]]] = {} for line in right_lines: fields = split_fields(line, args.delimiter) if len(fields) >= args.field2: right_index.setdefault(fields[args.field2 - 1], []).append(fields) records: list[JoinRecord] = [] output_lines: list[str] = [] for line in left_lines: left_fields = split_fields(line, args.delimiter) if len(left_fields) < args.field1: continue key = left_fields[args.field1 - 1] for right_fields in right_index.get(key, []): combined = [key] + [field for i, field in enumerate(left_fields) if i != args.field1 - 1] combined += [field for i, field in enumerate(right_fields) if i != args.field2 - 1] output = args.output_delimiter.join(combined) records.append({"key": key, "fields": combined, "line": output}) output_lines.append(output) if args.raw: return lines_to_raw(output_lines, encoding=args.encoding) emitted, truncated = bounded_lines(records, args.max_lines) return { "source_paths": left_sources + right_sources, "returned_records": len(emitted), "total_records": len(records), "truncated": truncated, "records": emitted, } - TypedDict 'JoinRecord' defining the schema for each joined output record (key, fields, line).
class JoinRecord(TypedDict): key: str fields: list[str] line: str - src/aicoreutils/parser/_parser.py:447-456 (registration)Registers the 'join' subcommand with argparse, defining all CLI arguments (paths, field1, field2, delimiter, output-delimiter, encoding, max-lines, raw) and binding the handler function command_join.
p = add_subparser("join", help="Join two files on a selected field.") p.add_argument("paths", nargs=2, help="Two files to join.") p.add_argument("--field1", type=int, default=1, help="1-based join field for the first file.") p.add_argument("--field2", type=int, default=1, help="1-based join field for the second file.") p.add_argument("--delimiter", help="Input delimiter. Defaults to any whitespace.") p.add_argument("--output-delimiter", default=" ", help="Delimiter for output fields.") p.add_argument("--encoding", default="utf-8", help="Text encoding.") p.add_argument("--max-lines", type=int, default=DEFAULT_MAX_LINES, help="Maximum JSON records to emit.") p.add_argument("--raw", action="store_true", help="Write joined text without a JSON envelope.") p.set_defaults(func=command_join) - Helper function split_fields used by command_join to split a line into fields by a delimiter or whitespace.
def split_fields(line: str, delimiter: str | None) -> list[str]: """按分隔符或空白字符拆分字段。""" return line.split(delimiter) if delimiter is not None else line.split()