dormant_contexts
Identify conversation domains with unresolved questions, acting as a reminder for forgotten topics or tasks you left incomplete.
Instructions
Find abandoned tunnels — domains with open questions you haven't resolved. The 'what have I forgotten?' alarm.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| min_importance | No | significant | |
| limit | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- The `dormant_contexts` tool handler function. Queries summaries database to find domains with unresolved open questions (abandoned tunnels). Supports min_importance filter (breakthrough/significant/routine) and limit. Falls back to raw conversations when no summaries exist.
def dormant_contexts(min_importance: str = "significant", limit: int = 20) -> str: """ Find abandoned tunnels — domains with open questions you haven't resolved. The 'what have I forgotten?' alarm. """ db = get_summaries_db() if db: rows = db.execute(""" SELECT domain_primary, COUNT(*) as conv_count, GROUP_CONCAT(open_questions, '|||') as all_oq, GROUP_CONCAT(importance, ',') as importances, MAX(thinking_stage) as latest_stage FROM summaries WHERE domain_primary != '' AND domain_primary IS NOT NULL GROUP BY domain_primary ORDER BY conv_count DESC """).fetchall() if not rows: return "No domain data found." importance_rank = {"breakthrough": 3, "significant": 2, "routine": 1} min_rank = importance_rank.get(min_importance, 1) results = [] for domain, count, all_oq_str, importances_str, stage in rows: imps = (importances_str or "").split(",") max_imp = max(importance_rank.get(i.strip(), 0) for i in imps if i.strip()) if max_imp < min_rank: continue questions = [] for chunk in (all_oq_str or "").split("|||"): for q in parse_json_field(chunk): if q and q.lower() != "none identified" and q not in questions: questions.append(q) if not questions: continue bt_count = sum(1 for i in imps if i.strip() == "breakthrough") results.append((domain, count, questions, stage, bt_count)) results.sort(key=lambda x: (-x[4], -len(x[2]), -x[1])) output = [f"## 🔴 Dormant Contexts (importance >= {min_importance})\n"] output.append(f"_Domains with unresolved open questions_\n") for domain, count, questions, stage, bt in results[:limit]: bt_marker = " 💎" if bt else "" output.append(f"### {domain}{bt_marker}") output.append(f"_{count} conversations | Stage: {stage or 'unknown'}_") output.append(f"**{len(questions)} open questions:**") for q in questions[:5]: output.append(f" ❓ {q}") if len(questions) > 5: output.append(f" _... and {len(questions)-5} more_") output.append("") output.append(f"_Total: {len(results)} domains with open questions_") return "\n".join(output) # ── Fallback: raw conversations ── try: con = get_conversations() except FileNotFoundError: return "No conversation data found. Run the ingest pipeline first." rows = con.execute(""" SELECT conversation_title, MAX(created) as last_active, COUNT(*) as msgs, SUM(CASE WHEN has_question = 1 AND role = 'user' THEN 1 ELSE 0 END) as questions FROM conversations GROUP BY conversation_title HAVING questions > 0 ORDER BY last_active ASC """).fetchall() if not rows: return "No conversations with questions found." output = [f"## 🔴 Dormant Contexts (from raw conversations)\n"] output.append(f"_Topics with unanswered questions, oldest first_\n") shown = 0 for title, last_active, msgs, q_count in rows: if shown >= limit: break if not title: continue date_str = str(last_active)[:10] if last_active else "unknown" output.append(f"### {title}") output.append(f"_{msgs} messages | {q_count} questions | Last active: {date_str}_") output.append("") shown += 1 output.append(f"\n_Total: {len(rows)} topics with open questions_") output.append(_SUMMARIES_HINT) return "\n".join(output) - brain_mcp/server/server.py:39-49 (registration)Registration of the tools_prosthetic module in the MCP server (which contains dormant_contexts). The register() function from tools_prosthetic is called on line 47.
from . import tools_prosthetic from . import tools_github from . import tools_analytics tools_conversations.register(mcp) tools_search.register(mcp) tools_synthesis.register(mcp) tools_stats.register(mcp) tools_prosthetic.register(mcp) tools_github.register(mcp) tools_analytics.register(mcp) - brain_mcp/server/tools_prosthetic.py:149-150 (registration)The register() function that decorates all prosthetic tools (including dormant_contexts) with @mcp.tool() to register them with the MCP server.
def register(mcp): """Register prosthetic tools with the MCP server.""" - Dashboard tool registry entry for 'dormant_contexts' defining its name, description, category (Cognitive Prosthetic), required dependencies (summaries), and parameter schema (min_importance: float=0, limit: int=5).
{ "name": "dormant_contexts", "description": "Find abandoned tunnels / dormant domains", "category": "Cognitive Prosthetic", "requires": ["summaries"], "params": {"min_importance": "float=0", "limit": "int=5"}, "probe": {}, }, - Helper function `_domain_fallback_search()` used when summaries are not available, performing keyword-based search of raw conversations used by the fallback path of dormant_contexts.
def _domain_fallback_search(con, domain: str, limit: int): """Search for domain-related conversations using expanded keywords.""" # First try exact domain name pattern = f"%{domain}%" rows = con.execute(""" SELECT conversation_title, content, created, source FROM conversations WHERE (content ILIKE ? OR conversation_title ILIKE ?) AND role = 'user' ORDER BY created DESC LIMIT ? """, [pattern, pattern, limit * 3]).fetchall() if rows: return rows # Expand to related keywords keywords = _expand_domain_search(domain) for kw in keywords.split(): if len(kw) < 3: continue kw_pattern = f"%{kw}%" kw_rows = con.execute(""" SELECT conversation_title, content, created, source FROM conversations WHERE (content ILIKE ? OR conversation_title ILIKE ?) AND role = 'user' ORDER BY created DESC LIMIT ? """, [kw_pattern, kw_pattern, limit]).fetchall() rows.extend(kw_rows) # Deduplicate by content seen = set() unique = [] for r in rows: key = (r[1] or "")[:100] if key not in seen: seen.add(key) unique.append(r) return unique[:limit * 3]