list_repo_issues
List issues for a Tangled repository by specifying the owner/repo identifier. Optionally set the maximum number of issues to return (1-100).
Instructions
list issues for a repository
Args: repo: repository identifier in 'owner/repo' format limit: maximum number of issues to return (1-100)
Returns: ListIssuesResult with list of issues
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| repo | Yes | repository identifier in 'owner/repo' format (e.g., 'zzstoatzz/tangled-mcp') | |
| limit | No | maximum number of issues to return |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| issues | Yes |
Implementation Reference
- src/tangled_mcp/server.py:179-205 (handler)MCP tool handler that resolves repo identifier, calls _tangled.list_repo_issues(), and wraps the result in ListIssuesResult.
@tangled_mcp.tool def list_repo_issues( repo: Annotated[ str, Field( description="repository identifier in 'owner/repo' format (e.g., 'zzstoatzz/tangled-mcp')" ), ], limit: Annotated[ int, Field(ge=1, le=100, description="maximum number of issues to return") ] = 20, ) -> ListIssuesResult: """list issues for a repository Args: repo: repository identifier in 'owner/repo' format limit: maximum number of issues to return (1-100) Returns: ListIssuesResult with list of issues """ # resolve owner/repo to (knot, did/repo) _, repo_id = _tangled.resolve_repo_identifier(repo) # list_repo_issues doesn't need knot (queries atproto records, not XRPC) response = _tangled.list_repo_issues(repo_id, limit, cursor=None) return ListIssuesResult.from_api_response(response) - Core implementation that queries AT Protocol to list issues. Resolves repo AT-URI, lists issue records from 'sh.tangled.repo.issue' collection, filters by repo, and fetches label ops to populate issue labels.
def list_repo_issues( repo_id: str, limit: int = 50, cursor: str | None = None ) -> dict[str, Any]: """list issues for a repository Args: repo_id: repository identifier in "did/repo" format limit: maximum number of issues to return cursor: pagination cursor Returns: dict containing issues and optional cursor """ client = _get_authenticated_client() if not client.me: raise RuntimeError("client not authenticated") # parse repo_id to get owner_did and repo_name if "/" not in repo_id: raise ValueError(f"invalid repo_id format: {repo_id}") owner_did, repo_name = repo_id.split("/", 1) # get the repo AT-URI by querying the repo collection records = client.com.atproto.repo.list_records( models.ComAtprotoRepoListRecords.Params( repo=owner_did, collection="sh.tangled.repo", limit=100, ) ) repo_at_uri = None for record in records.records: if ( name := getattr(record.value, "name", None) ) is not None and name == repo_name: repo_at_uri = record.uri break if not repo_at_uri: raise ValueError(f"repo not found: {repo_id}") # list records from the issue collection response = client.com.atproto.repo.list_records( models.ComAtprotoRepoListRecords.Params( repo=client.me.did, collection="sh.tangled.repo.issue", limit=limit, cursor=cursor, ) ) # filter issues by repo issues = [] issue_uris = [] for record in response.records: if ( repo := getattr(record.value, "repo", None) ) is not None and repo == repo_at_uri: issue_uris.append(record.uri) issues.append( { "uri": record.uri, "cid": record.cid, "issueId": getattr(record.value, "issueId", 0), "title": getattr(record.value, "title", ""), "body": getattr(record.value, "body", None), "createdAt": getattr(record.value, "createdAt", ""), "labels": [], # will be populated below } ) # fetch label ops and correlate with issues if issue_uris: label_ops = client.com.atproto.repo.list_records( models.ComAtprotoRepoListRecords.Params( repo=client.me.did, collection="sh.tangled.label.op", limit=100, ) ) # build map of issue_uri -> current label URIs issue_labels_map: dict[str, set[str]] = {uri: set() for uri in issue_uris} for op_record in label_ops.records: if ( hasattr(op_record.value, "subject") and op_record.value.subject in issue_labels_map ): subject_uri = op_record.value.subject if hasattr(op_record.value, "add"): for operand in op_record.value.add: if hasattr(operand, "key"): issue_labels_map[subject_uri].add(operand.key) if hasattr(op_record.value, "delete"): for operand in op_record.value.delete: if hasattr(operand, "key"): issue_labels_map[subject_uri].discard(operand.key) # extract label names from URIs and add to issues for issue in issues: label_uris = issue_labels_map.get(issue["uri"], set()) issue["labels"] = [uri.split("/")[-1] for uri in label_uris] return {"issues": issues, "cursor": response.cursor} - ListIssuesResult Pydantic model with from_api_response() classmethod that parses raw API response into IssueInfo objects, filtering out malformed issues.
class ListIssuesResult(BaseModel): """result of listing issues""" issues: list[IssueInfo] @classmethod def from_api_response(cls, response: dict[str, Any]) -> "ListIssuesResult": """construct from raw API response Args: response: raw response from tangled API with structure: { "issues": [ { "uri": "at://...", "cid": "bafyrei...", "issueId": 1, "title": "...", "body": "...", "createdAt": "..." }, ... ] } Returns: ListIssuesResult with parsed issues """ issues = [] for issue_data in response.get("issues", []): # skip malformed issues (e.g., missing issueId) if issue_data.get("issueId") is None: continue issues.append(IssueInfo(**issue_data)) return cls(issues=issues) - src/tangled_mcp/_tangled/__init__.py:9-25 (registration)Re-exports list_repo_issues from _issues module and includes it in __all__ for public API access.
from tangled_mcp._tangled._issues import ( create_issue, delete_issue, list_repo_issues, list_repo_labels, update_issue, ) from tangled_mcp._tangled._pulls import list_repo_pulls __all__ = [ "_get_authenticated_client", "get_service_token", "list_branches", "create_issue", "update_issue", "delete_issue", "list_repo_issues", - tests/test_server.py:32-32 (registration)Test asserting that list_repo_issues is present in the server's exposed tools.
assert "list_repo_issues" in tool_names