sync_tool
Manually update the local transcript index by syncing new or changed Dropbox podcast transcripts, enabling up-to-date search within Claude Code.
Instructions
Manually trigger a sync from Dropbox to update the local transcript index.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- The sync_tool handler function decorated with @mcp.tool(). It calls sync(config, conn) from dropbox_sync module and returns a formatted result string with added/updated/deleted/total counts.
@mcp.tool() def sync_tool() -> str: """Manually trigger a sync from Dropbox to update the local transcript index.""" config = _get_config() conn = _get_conn() try: result = sync(config, conn) return ( f"Sync complete: {result['added']} added, {result['updated']} updated, " f"{result['deleted']} deleted. {result['total']} total episodes indexed." ) except Exception as e: return f"Sync failed: {e}" - src/dropbox_transcripts_mcp/server.py:98-98 (registration)The @mcp.tool() decorator registers sync_tool as an MCP tool on the FastMCP server instance.
@mcp.tool() - The sync() function called by sync_tool. Lists .txt files from Dropbox, compares file hashes with the local DB, downloads new/changed files, deletes removed episodes, and updates the sync timestamp.
def sync(config: Config, conn: sqlite3.Connection) -> dict: dbx = get_client(config) # Collect all .txt files from the Dropbox folder remote: dict[str, FileMetadata] = {} try: result = dbx.files_list_folder(config.folder_path) while True: for entry in result.entries: if isinstance(entry, FileMetadata) and entry.name.endswith(".txt"): name = _name_from_filename(entry.name) remote[name] = entry if not result.has_more: break result = dbx.files_list_folder_continue(result.cursor) except ApiError as e: logger.error("Failed to list Dropbox folder %s: %s", config.folder_path, e) raise local_hashes = get_all_file_hashes(conn) local_names = get_all_names(conn) remote_names = set(remote.keys()) added = updated = deleted = 0 for name, entry in remote.items(): if local_hashes.get(name) == entry.content_hash: continue try: _, response = dbx.files_download(entry.path_lower) content = response.content.decode("utf-8", errors="replace") changed = upsert_episode(conn, name, content, entry.content_hash) if changed: if name in local_names: updated += 1 else: added += 1 except Exception: logger.exception("Failed to download %s", entry.path_lower) for name in local_names - remote_names: delete_episode(conn, name) deleted += 1 set_last_sync(conn, datetime.now(timezone.utc).isoformat()) logger.info("Sync complete: +%d ~%d -%d (total %d)", added, updated, deleted, len(remote_names)) return {"added": added, "updated": updated, "deleted": deleted, "total": len(remote_names)} - The Config dataclass used by sync_tool (via _get_config()) to access Dropbox credentials, folder path, and sync interval.
@dataclass class Config: app_key: str app_secret: str refresh_token: str folder_path: str sync_interval_hours: int db_path: str def load_config() -> Config: missing = [v for v in ("DROPBOX_APP_KEY", "DROPBOX_APP_SECRET", "DROPBOX_REFRESH_TOKEN") if not os.environ.get(v)] if missing: raise RuntimeError(f"Missing required environment variables: {', '.join(missing)}") return Config( app_key=os.environ["DROPBOX_APP_KEY"], app_secret=os.environ["DROPBOX_APP_SECRET"], refresh_token=os.environ["DROPBOX_REFRESH_TOKEN"], folder_path=os.environ.get("DROPBOX_FOLDER_PATH", "/Podcasts/Lenny"), sync_interval_hours=int(os.environ.get("SYNC_INTERVAL_HOURS", "6")), db_path=os.environ.get( "DB_PATH", os.path.expanduser("~/.dropbox-transcripts-mcp/transcripts.db"), ), ) - Database helper functions (get_all_file_hashes, get_all_names, upsert_episode, delete_episode, set_last_sync) used by sync() which is called by sync_tool.
def get_all_file_hashes(conn: sqlite3.Connection) -> dict[str, str]: rows = conn.execute("SELECT name, file_hash FROM episodes").fetchall() return {r["name"]: r["file_hash"] for r in rows} def get_all_names(conn: sqlite3.Connection) -> set[str]: rows = conn.execute("SELECT name FROM episodes").fetchall() return {r["name"] for r in rows} def get_last_sync(conn: sqlite3.Connection) -> str | None: row = conn.execute("SELECT value FROM sync_state WHERE key = 'last_sync'").fetchone() return row["value"] if row else None def set_last_sync(conn: sqlite3.Connection, timestamp: str) -> None: conn.execute( "INSERT INTO sync_state (key, value) VALUES ('last_sync', ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value", (timestamp,), ) conn.commit()