forget_source
Remove all indexed content from a named source, including vector store, BM25 index, and SQLite metadata. Use dry run to preview before permanent deletion.
Instructions
Remove all indexed content from a named source.
Deletes every document belonging to *source* from the vector store,
BM25 index, and SQLite metadata. This is irreversible unless the
source is re-synced.
Args:
source: Source name as it appears in the index (e.g. ``"jira"``).
dry_run: Preview without deleting.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| source | Yes | ||
| dry_run | No |
Implementation Reference
- The `forget_source` tool handler function. Queries metadata_store for all records matching the given source name, then deletes each record from vector_store, BM25 index, and metadata_store. Supports dry_run mode. Returns status dict with removed/errors counts.
def forget_source( source: Annotated[str, "Name of the source to remove from the index."], dry_run: Annotated[ bool, "If true, report what would be removed without actually removing it.", ] = False, ) -> dict: """Remove all indexed content from a named source. Deletes every document belonging to *source* from the vector store, BM25 index, and SQLite metadata. This is irreversible unless the source is re-synced. Args: source: Source name as it appears in the index (e.g. ``"jira"``). dry_run: Preview without deleting. """ from memorymesh.server.auth_guard import check_access if (err := check_access(ctx, "delete")) is not None: return err records = [r for r in ctx.metadata_store.list_files() if r.source_name == source] if not records: return { "status": "not_found", "source": source, "message": f"No indexed documents found for source '{source}'.", } if dry_run: return { "status": "dry_run", "source": source, "would_remove": len(records), "paths": [r.path for r in records[:20]], } removed = 0 errors = 0 for rec in records: try: ctx.vector_store.delete_by_path(rec.path) ctx.bm25.delete_by_path(rec.path) ctx.metadata_store.mark_deleted(rec.path) removed += 1 except Exception as exc: from loguru import logger logger.warning(f"forget_source: failed to remove {rec.path!r}: {exc}") errors += 1 return { "status": "ok", "source": source, "removed": removed, "errors": errors, } - src/memorymesh/server/app.py:108-142 (registration)Import of forget_source module and registration call `forget_source.register(mcp, ctx)` on line 138, wiring the tool into the FastMCP server.
from memorymesh.server.tools import ( ask_memory, forget_memory, forget_source, get_document, get_entity, graph_memory, index_now, list_sources, pin_memory, query_timeline, related_documents, search_by_date, search_memory, summarize_source, sync_source, ) search_memory.register(mcp, ctx) list_sources.register(mcp, ctx) get_document.register(mcp, ctx) index_now.register(mcp, ctx) ask_memory.register(mcp, ctx) pin_memory.register(mcp, ctx) forget_memory.register(mcp, ctx) query_timeline.register(mcp, ctx) sync_source.register(mcp, ctx) get_entity.register(mcp, ctx) related_documents.register(mcp, ctx) search_by_date.register(mcp, ctx) forget_source.register(mcp, ctx) summarize_source.register(mcp, ctx) graph_memory.register(mcp, ctx) return mcp - src/memorymesh/server/tools/forget_source.py:17-28 (registration)The `register()` function that decorates `forget_source` with `@mcp.tool()` to register it as an MCP tool on the FastMCP instance.
def register(mcp: FastMCP, ctx: AppContext) -> None: """Register the ``forget_source`` tool on *mcp* with *ctx* injected. Args: mcp: The FastMCP instance to register onto. ctx: Shared application context (injected via closure). """ @mcp.tool() def forget_source( source: Annotated[str, "Name of the source to remove from the index."], dry_run: Annotated[ - The `check_access` helper function used by the handler to enforce auth (identity resolution, revocation, rate limiting, ACL) for the 'delete' action before proceeding.
def check_access( ctx: AppContext, action: _Action, *, source: str | None = None, client_id: str | None = None, meta: dict | None = None, ) -> dict | None: """Perform the full Wave-4 auth stack for a single tool invocation. Steps (each step is a no-op when the corresponding component is not configured or auth is disabled): 1. Resolve client identity - from explicit *client_id*, then from the :data:`_CURRENT_CLIENT_ID` context variable (set by HTTP middleware), then falls back to anonymous. 2. Revocation check - deny if the client_id is in the revocation list. 3. Rate limiting - deny if the token bucket is exhausted. 4. ACL check - deny if the identity lacks the required permission. Args: ctx: The shared :class:`~memorymesh.server.app.AppContext`. action: The operation type - ``"read"``, ``"index"``, ``"delete"``, or ``"admin"``. source: Optional source name (used by ``check_read`` for source-level ACL filtering). client_id: Optional explicit client identifier override. When provided, overrides MCP metadata and the context variable. meta: Optional MCP request metadata. If present, ``_meta.clientId`` is used before the context variable. Returns: ``None`` when access is granted, or a ``{"error": str}`` dict to return directly from the MCP tool handler. """ # Short-circuit when auth is disabled (default) or no resolver is wired. if ctx.identity_resolver is None or not ctx.config.auth.enabled: return None # Priority: explicit kwarg > MCP metadata > context var > anonymous. raw_id = client_id if client_id is not None else _extract_mcp_client_id(meta) if raw_id is None: raw_id = _CURRENT_CLIENT_ID.get() identity = ctx.identity_resolver.resolve(raw_id) if ctx.revocation_list is not None: try: if ctx.revocation_list.is_revoked(identity.client_id): return {"error": f"Client {identity.client_id!r} has been revoked."} except Exception as exc: logger.debug(f"AuthGuard: revocation check skipped after store error: {exc}") if ctx.rate_limiter is not None: try: ctx.rate_limiter.check(identity) except Exception as exc: from memorymesh.core.exceptions import RateLimitExceededError if isinstance(exc, RateLimitExceededError): return {"error": f"Rate limit exceeded. Retry after {exc.retry_after_s:.1f}s."} if ctx.acl_enforcer is not None: try: if action == "read": ctx.acl_enforcer.check_read(identity, source=source) elif action == "index": ctx.acl_enforcer.check_index(identity) elif action == "delete": ctx.acl_enforcer.check_delete(identity) elif action == "admin": ctx.acl_enforcer.check_admin(identity) except Exception as exc: from memorymesh.core.exceptions import PermissionDeniedError if isinstance(exc, PermissionDeniedError): return {"error": str(exc)} return None