code_intel_status
Check code intelligence state including snapshots, files, records, edges, and embeddings to verify indexing completeness and data integrity.
Instructions
Check code intelligence snapshot, file, record, edge, and embedding state.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| collection | No | ||
| repo | No | ||
| snapshot_id | No | ||
| include_historical | No |
Implementation Reference
- The main handler function for the code_intel_status tool. Queries multiple tables (snapshots, records, files, edges, static_runs, static_findings) and returns aggregated status information about the code intelligence database.
def tool_code_intel_status(args: Json) -> Json: filters = status_filters(args) with db.connect() as conn: if not code_intel_tables_exist(conn): return ok({"schema_present": False}) schema_versions = ( schema_migration_versions(conn) if table_regclass_exists(conn, "project_code_intel_schema_migrations") else [] ) snapshots = conn.execute( db.query_sql( query_with_where( """ SELECT s.id, s.collection, s.repo, s.repo_role, s.branch, s.commit_sha, s.tree_sha, s.dirty, s.created_at FROM project_code_intel_snapshots s """, filters.snapshots.clauses, """ ORDER BY s.created_at DESC, s.collection, s.repo """, ) ), filters.snapshots.params, ).fetchall() counts = conn.execute( db.query_sql( query_with_where( """ SELECT r.collection, r.repo, count(*) AS records, count(r.embedding) AS embedded_records FROM project_code_intel_records r """, filters.records.clauses, """ GROUP BY r.collection, r.repo ORDER BY r.collection, r.repo """, ) ), filters.records.params, ).fetchall() by_type = conn.execute( db.query_sql( query_with_where( """ SELECT r.collection, r.repo, r.record_type, count(*) AS count FROM project_code_intel_records r """, filters.records.clauses, """ GROUP BY r.collection, r.repo, r.record_type ORDER BY r.collection, r.repo, r.record_type """, ) ), filters.records.params, ).fetchall() files = conn.execute( db.query_sql( query_with_where( """ SELECT f.collection, f.repo, count(*) AS files, count(*) FILTER (WHERE f.skipped_reason IS NOT NULL) AS skipped_files FROM project_code_intel_files f """, filters.files.clauses, """ GROUP BY f.collection, f.repo ORDER BY f.collection, f.repo """, ) ), filters.files.params, ).fetchall() edges = conn.execute( db.query_sql( query_with_where( """ SELECT e.collection, e.repo, count(*) AS edges FROM project_code_intel_edges e """, filters.edges.clauses, """ GROUP BY e.collection, e.repo ORDER BY e.collection, e.repo """, ) ), filters.edges.params, ).fetchall() static_runs, static_findings = static_status_rows(conn, filters) return ok({ "schema_present": True, "schema_versions": schema_versions, **snapshot_scope_response(args), "snapshots": snapshots, "files": files, "records": counts, "records_by_type": by_type, "edges": edges, "static_runs": static_runs, "static_findings": static_findings, }) - ToolDefinition with description and input_schema for code_intel_status. Accepts optional parameters: collection, repo, snapshot_id, include_historical.
"code_intel_status": ToolDefinition( "Check code intelligence snapshot, file, record, edge, and embedding state.", { "type": "object", "properties": { "collection": {"type": "string"}, "repo": {"type": "string"}, "snapshot_id": {"type": "integer", "minimum": 1}, "include_historical": {"type": "boolean"}, }, "additionalProperties": False, }, ), - src/project_code_intelligence/server.py:522-531 (registration)Registration in the TOOLS dictionary that maps the tool name to its ToolDefinition and handler function.
TOOLS: ToolRegistry = { "code_intel_status": (TOOL_DEFINITIONS["code_intel_status"], tool_code_intel_status), "search_code_intel_text": (TOOL_DEFINITIONS["search_code_intel_text"], tool_search_code_intel_text), "search_code_intel_semantic": (TOOL_DEFINITIONS["search_code_intel_semantic"], tool_search_code_intel_semantic), "get_code_intel_record": (TOOL_DEFINITIONS["get_code_intel_record"], tool_get_code_intel_record), "related_code_intel": (TOOL_DEFINITIONS["related_code_intel"], tool_related_code_intel), "search_static_findings": (TOOL_DEFINITIONS["search_static_findings"], tool_search_static_findings), "get_static_finding": (TOOL_DEFINITIONS["get_static_finding"], tool_get_static_finding), "get_static_code_flow": (TOOL_DEFINITIONS["get_static_code_flow"], tool_get_static_code_flow), } - status_filters() builds the StatusFilters dataclass used by the handler to query each table with appropriate WHERE clauses.
def status_filters(args: Json) -> StatusFilters: snapshot_clauses, snapshot_params = scoped_snapshot_table_collection_repo_clauses(args, "s") record_clauses, record_params = scoped_collection_repo_clauses(args, "r") file_clauses, file_params = scoped_collection_repo_clauses(args, "f") edge_clauses, edge_params = scoped_collection_repo_clauses(args, "e") static_run_clauses, static_run_params = scoped_collection_repo_clauses(args, "r") static_finding_clauses_for_status, static_finding_params = scoped_collection_repo_clauses(args, "f") return StatusFilters( snapshots=ClauseParams(snapshot_clauses, snapshot_params), records=ClauseParams(record_clauses, record_params), files=ClauseParams(file_clauses, file_params), edges=ClauseParams(edge_clauses, edge_params), static_runs=ClauseParams(static_run_clauses, static_run_params), static_findings=ClauseParams(static_finding_clauses_for_status, static_finding_params), ) - static_status_rows helper queried by the handler to get static analysis run and finding counts.
def static_status_rows(conn: db.DbConnection, filters: StatusFilters) -> tuple[list[db.DbRow], list[db.DbRow]]: static_runs = [] static_findings = [] if table_regclass_exists(conn, "project_code_intel_static_runs"): static_runs = conn.execute( db.query_sql( query_with_where( """ SELECT r.collection, r.repo, r.tool_name, count(*) AS runs FROM project_code_intel_static_runs r """, filters.static_runs.clauses, """ GROUP BY r.collection, r.repo, r.tool_name ORDER BY r.collection, r.repo, r.tool_name """, ) ), filters.static_runs.params, ).fetchall() if table_regclass_exists(conn, "project_code_intel_static_findings"): static_findings = conn.execute( db.query_sql( query_with_where( """ SELECT f.collection, f.repo, f.rule_id, f.level, count(*) AS findings FROM project_code_intel_static_findings f """, filters.static_findings.clauses, """ GROUP BY f.collection, f.repo, f.rule_id, f.level ORDER BY f.collection, f.repo, f.rule_id, f.level """, ) ), filters.static_findings.params, ).fetchall() return static_runs, static_findings