sync_source
Fetch and index documents from external connectors into the search index. Supports running a single connector or all, with an optional dry-run mode to fetch without indexing.
Instructions
Fetch documents from one or all configured external connectors.
Runs the enabled connectors defined in ``config.yaml`` and indexes
each fetched document into the search index.
Args:
source_type: Connector type key to run, or ``null`` for all.
dry_run: Fetch without indexing.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| source_type | No | ||
| dry_run | No |
Implementation Reference
- The `sync_source` MCP tool handler: fetches documents from configured external data connectors (e.g. jira, slack) and indexes them into MemoryMesh stores. Accepts optional `source_type` filter and `dry_run` flag. Returns status, doc counts, and per-connector results.
def sync_source( source_type: Annotated[ str | None, ( "Connector type to sync, e.g. 'jira' or 'slack'. " "Pass null to sync all enabled connectors." ), ] = None, dry_run: Annotated[ bool, "If true, fetch documents but do not write to the index.", ] = False, ) -> dict: """Fetch documents from one or all configured external connectors. Runs the enabled connectors defined in ``config.yaml`` and indexes each fetched document into the search index. Args: source_type: Connector type key to run, or ``null`` for all. dry_run: Fetch without indexing. """ from memorymesh.connectors.registry import get_connector_classes from memorymesh.server.auth_guard import check_access if (err := check_access(ctx, "index")) is not None: return err connectors_to_run = [ c for c in ctx.config.connectors if c.enabled and (source_type is None or c.type == source_type) ] if not connectors_to_run: label = f"'{source_type}'" if source_type else "any" return { "status": "no_connectors", "message": f"No enabled connector found matching {label}.", } total_docs = 0 total_errors = 0 results: list[dict] = [] for conn_cfg in connectors_to_run: entry: dict = {"type": conn_cfg.type, "docs": 0, "errors": 0} try: cfg_cls, conn_cls = get_connector_classes(conn_cfg.type) config_obj = cfg_cls(**conn_cfg.config) connector = conn_cls(config_obj) except (KeyError, Exception) as exc: entry["errors"] = 1 entry["error"] = str(exc) results.append(entry) total_errors += 1 continue source_name = getattr(config_obj, "source_name", conn_cfg.type) doc_count = 0 try: for doc in connector.fetch_documents(): doc_count += 1 if not dry_run: result = ctx.indexer.index_parsed_document(doc, source_name) if result.status == "parse_error": entry["errors"] = entry["errors"] + 1 total_errors += 1 except Exception as exc: entry["errors"] = entry["errors"] + 1 entry["error"] = str(exc) total_errors += 1 entry["docs"] = doc_count results.append(entry) total_docs += doc_count return { "status": "ok", "dry_run": dry_run, "total_docs": total_docs, "total_errors": total_errors, "connectors": results, } - Input parameters of sync_source: `source_type` (optional string or null, defaults to None - meaning all enabled connectors) and `dry_run` (boolean, defaults to False).
source_type: Annotated[ str | None, ( "Connector type to sync, e.g. 'jira' or 'slack'. " "Pass null to sync all enabled connectors." ), ] = None, dry_run: Annotated[ bool, "If true, fetch documents but do not write to the index.", ] = False, ) -> dict: - src/memorymesh/server/tools/sync_source.py:17-25 (registration)The `register()` function that registers sync_source onto the FastMCP instance via the `@mcp.tool()` decorator, with `AppContext` injected via closure.
def register(mcp: FastMCP, ctx: AppContext) -> None: """Register the ``sync_source`` tool on *mcp* with *ctx* injected. Args: mcp: The FastMCP instance to register onto. ctx: Shared application context (injected via closure). """ @mcp.tool() - src/memorymesh/server/app.py:134-134 (registration)Registration call in app.py: `sync_source.register(mcp, ctx)` which wires the tool into the main MCP server.
sync_source.register(mcp, ctx) - Imports from `memorymesh.connectors.registry.get_connector_classes` (for resolving connector types) and `memorymesh.server.auth_guard.check_access` (for authorization check on 'index' action).
from memorymesh.connectors.registry import get_connector_classes from memorymesh.server.auth_guard import check_access if (err := check_access(ctx, "index")) is not None: return err connectors_to_run = [ c for c in ctx.config.connectors if c.enabled and (source_type is None or c.type == source_type) ] if not connectors_to_run: label = f"'{source_type}'" if source_type else "any" return { "status": "no_connectors", "message": f"No enabled connector found matching {label}.", } total_docs = 0 total_errors = 0 results: list[dict] = [] for conn_cfg in connectors_to_run: entry: dict = {"type": conn_cfg.type, "docs": 0, "errors": 0} try: cfg_cls, conn_cls = get_connector_classes(conn_cfg.type) config_obj = cfg_cls(**conn_cfg.config) connector = conn_cls(config_obj) except (KeyError, Exception) as exc: entry["errors"] = 1 entry["error"] = str(exc) results.append(entry) total_errors += 1 continue source_name = getattr(config_obj, "source_name", conn_cfg.type) doc_count = 0 try: for doc in connector.fetch_documents(): doc_count += 1 if not dry_run: result = ctx.indexer.index_parsed_document(doc, source_name) if result.status == "parse_error": entry["errors"] = entry["errors"] + 1 total_errors += 1 except Exception as exc: entry["errors"] = entry["errors"] + 1