"""NotebookLM MCP Server."""
import argparse
import functools
import json
import logging
import os
from typing import Any
from fastmcp import FastMCP
from starlette.requests import Request
from starlette.responses import JSONResponse
from .api_client import NotebookLMClient, extract_cookies_from_chrome_export, parse_timestamp
from . import constants
from . import __version__
# MCP request/response logger
mcp_logger = logging.getLogger("notebooklm_mcp.mcp")
# Initialize MCP server
mcp = FastMCP(
name="notebooklm",
instructions="""NotebookLM MCP - Access NotebookLM (notebooklm.google.com).
**Auth:** If you get authentication errors, run `notebooklm-mcp-auth` via your Bash/terminal tool. This is the automated authentication method that handles everything. Only use save_auth_tokens as a fallback if the CLI fails.
**Confirmation:** Tools with confirm param require user approval before setting confirm=True.
**Studio:** After creating audio/video/infographic/slides, poll studio_status for completion.""",
)
# Health check endpoint for load balancers and monitoring
@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> JSONResponse:
"""Health check endpoint for load balancers and monitoring."""
return JSONResponse({
"status": "healthy",
"service": "notebooklm-mcp",
"version": __version__,
})
# Global state
_client: NotebookLMClient | None = None
_query_timeout: float = float(os.environ.get("NOTEBOOKLM_QUERY_TIMEOUT", "120.0"))
def logged_tool():
"""Decorator that combines @mcp.tool() with MCP request/response logging."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
tool_name = func.__name__
if mcp_logger.isEnabledFor(logging.DEBUG):
# Log request
params = {k: v for k, v in kwargs.items() if v is not None}
mcp_logger.debug(f"MCP Request: {tool_name}({json.dumps(params, default=str)})")
result = func(*args, **kwargs)
if mcp_logger.isEnabledFor(logging.DEBUG):
# Log response (truncate if too long)
result_str = json.dumps(result, default=str)
if len(result_str) > 1000:
result_str = result_str[:1000] + "..."
mcp_logger.debug(f"MCP Response: {tool_name} -> {result_str}")
return result
# Apply the MCP tool decorator
return mcp.tool()(wrapper)
return decorator
def get_client() -> NotebookLMClient:
"""Get or create the API client.
Tries environment variables first, falls back to cached tokens from auth CLI.
"""
global _client
if _client is None:
import os
from .auth import load_cached_tokens
cookie_header = os.environ.get("NOTEBOOKLM_COOKIES", "")
csrf_token = os.environ.get("NOTEBOOKLM_CSRF_TOKEN", "")
session_id = os.environ.get("NOTEBOOKLM_SESSION_ID", "")
if cookie_header:
# Use environment variables
cookies = extract_cookies_from_chrome_export(cookie_header)
else:
# Try cached tokens from auth CLI
cached = load_cached_tokens()
if cached:
cookies = cached.cookies
csrf_token = csrf_token or cached.csrf_token
session_id = session_id or cached.session_id
else:
raise ValueError(
"No authentication found. Either:\n"
"1. Run 'notebooklm-mcp-auth' to authenticate via Chrome, or\n"
"2. Set NOTEBOOKLM_COOKIES environment variable manually"
)
_client = NotebookLMClient(
cookies=cookies,
csrf_token=csrf_token,
session_id=session_id,
)
return _client
@logged_tool()
def refresh_auth() -> dict[str, Any]:
"""Reload auth tokens from disk or run headless re-authentication.
Call this after running notebooklm-mcp-auth to pick up new tokens,
or to attempt automatic re-authentication if Chrome profile has saved login.
Returns status indicating if tokens were refreshed successfully.
"""
global _client
try:
# Try reloading from disk first
from .auth import load_cached_tokens
cached = load_cached_tokens()
if cached:
# Reset client to force re-initialization with fresh tokens
_client = None
get_client() # This will use the cached tokens
return {
"status": "success",
"message": "Auth tokens reloaded from disk cache.",
}
# Try headless auth if Chrome profile exists
try:
from .auth_cli import run_headless_auth
tokens = run_headless_auth()
if tokens:
_client = None
get_client()
return {
"status": "success",
"message": "Auth tokens refreshed via headless Chrome.",
}
except Exception:
pass
return {
"status": "error",
"error": "No cached tokens found. Run 'notebooklm-mcp-auth' to authenticate.",
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_list(max_results: int = 100) -> dict[str, Any]:
"""List all notebooks.
Args:
max_results: Maximum number of notebooks to return (default: 100)
"""
try:
client = get_client()
notebooks = client.list_notebooks()
# Count owned vs shared notebooks
owned_count = sum(1 for nb in notebooks if nb.is_owned)
shared_count = len(notebooks) - owned_count
# Count notebooks shared by me (owned + is_shared=True)
shared_by_me_count = sum(1 for nb in notebooks if nb.is_owned and nb.is_shared)
return {
"status": "success",
"count": len(notebooks),
"owned_count": owned_count,
"shared_count": shared_count,
"shared_by_me_count": shared_by_me_count,
"notebooks": [
{
"id": nb.id,
"title": nb.title,
"source_count": nb.source_count,
"url": nb.url,
"ownership": nb.ownership,
"is_shared": nb.is_shared,
"created_at": nb.created_at,
"modified_at": nb.modified_at,
}
for nb in notebooks[:max_results]
],
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_create(title: str = "") -> dict[str, Any]:
"""Create a new notebook.
Args:
title: Optional title for the notebook
"""
try:
client = get_client()
notebook = client.create_notebook(title=title)
if notebook:
return {
"status": "success",
"notebook": {
"id": notebook.id,
"title": notebook.title,
"url": notebook.url,
},
}
return {"status": "error", "error": "Failed to create notebook"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_get(notebook_id: str) -> dict[str, Any]:
"""Get notebook details with sources.
Args:
notebook_id: Notebook UUID
"""
try:
client = get_client()
result = client.get_notebook(notebook_id)
# Extract timestamps from metadata if available
# Result structure: [title, sources, id, emoji, null, metadata, ...]
# metadata[5] = modified_at, metadata[8] = created_at
created_at = None
modified_at = None
if result and isinstance(result, list) and len(result) > 5:
metadata = result[5]
if isinstance(metadata, list):
if len(metadata) > 5:
modified_at = parse_timestamp(metadata[5])
if len(metadata) > 8:
created_at = parse_timestamp(metadata[8])
return {
"status": "success",
"notebook": result,
"created_at": created_at,
"modified_at": modified_at,
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_describe(notebook_id: str) -> dict[str, Any]:
"""Get AI-generated notebook summary with suggested topics.
Args:
notebook_id: Notebook UUID
Returns: summary (markdown), suggested_topics list
"""
try:
client = get_client()
result = client.get_notebook_summary(notebook_id)
return {
"status": "success",
**result, # Includes summary and suggested_topics
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def source_describe(source_id: str) -> dict[str, Any]:
"""Get AI-generated source summary with keyword chips.
Args:
source_id: Source UUID
Returns: summary (markdown with **bold** keywords), keywords list
"""
try:
client = get_client()
result = client.get_source_guide(source_id)
return {
"status": "success",
**result, # Includes summary and keywords
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def source_get_content(source_id: str) -> dict[str, Any]:
"""Get raw text content of a source (no AI processing).
Returns the original indexed text from PDFs, web pages, pasted text,
or YouTube transcripts. Much faster than notebook_query for content export.
Args:
source_id: Source UUID
Returns: content (str), title (str), source_type (str), char_count (int)
"""
try:
client = get_client()
result = client.get_source_fulltext(source_id)
return {
"status": "success",
**result, # Includes content, title, source_type, url, char_count
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_add_url(notebook_id: str, url: str) -> dict[str, Any]:
"""Add URL (website or YouTube) as source.
Args:
notebook_id: Notebook UUID
url: URL to add
"""
try:
client = get_client()
result = client.add_url_source(notebook_id, url=url)
if result:
return {
"status": "success",
"source": result,
}
return {"status": "error", "error": "Failed to add URL source"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_add_text(
notebook_id: str,
text: str,
title: str = "Pasted Text",
) -> dict[str, Any]:
"""Add pasted text as source.
Args:
notebook_id: Notebook UUID
text: Text content to add
title: Optional title
"""
try:
client = get_client()
result = client.add_text_source(notebook_id, text=text, title=title)
if result:
return {
"status": "success",
"source": result,
}
return {"status": "error", "error": "Failed to add text source"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_add_drive(
notebook_id: str,
document_id: str,
title: str,
doc_type: str = "doc",
) -> dict[str, Any]:
"""Add Google Drive document as source.
Args:
notebook_id: Notebook UUID
document_id: Drive document ID (from URL)
title: Display title
doc_type: doc|slides|sheets|pdf
"""
try:
mime_types = {
"doc": "application/vnd.google-apps.document",
"docs": "application/vnd.google-apps.document",
"slides": "application/vnd.google-apps.presentation",
"sheets": "application/vnd.google-apps.spreadsheet",
"pdf": "application/pdf",
}
mime_type = mime_types.get(doc_type.lower())
if not mime_type:
return {
"status": "error",
"error": f"Unknown doc_type '{doc_type}'. Use 'doc', 'slides', 'sheets', or 'pdf'.",
}
client = get_client()
result = client.add_drive_source(
notebook_id,
document_id=document_id,
title=title,
mime_type=mime_type,
)
if result:
# Handle timeout status from api_client (large files may timeout on backend)
if isinstance(result, dict) and result.get("status") == "timeout":
return {
"status": "timeout",
"message": result.get("message", "Operation timed out but may have succeeded."),
"hint": "Check notebook sources before retrying to avoid duplicates.",
}
return {
"status": "success",
"source": result,
}
return {"status": "error", "error": "Failed to add Drive source"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_query(
notebook_id: str,
query: str,
source_ids: list[str] | str | None = None,
conversation_id: str | None = None,
timeout: float | None = None,
) -> dict[str, Any]:
"""Ask AI about EXISTING sources already in notebook. NOT for finding new sources.
Use research_start instead for: deep research, web search, find new sources, Drive search.
Args:
notebook_id: Notebook UUID
query: Question to ask
source_ids: Source IDs to query (default: all)
conversation_id: For follow-up questions
timeout: Request timeout in seconds (default: from env NOTEBOOKLM_QUERY_TIMEOUT or 120.0)
"""
try:
# Handle AI clients that send source_ids as a JSON string instead of a list
if isinstance(source_ids, str):
import json
try:
source_ids = json.loads(source_ids)
except json.JSONDecodeError:
# If not valid JSON, treat as a single source ID
source_ids = [source_ids]
# Use provided timeout or fall back to global default
effective_timeout = timeout if timeout is not None else _query_timeout
client = get_client()
result = client.query(
notebook_id,
query_text=query,
source_ids=source_ids,
conversation_id=conversation_id,
timeout=effective_timeout,
)
if result:
return {
"status": "success",
"answer": result.get("answer", ""),
"conversation_id": result.get("conversation_id"),
}
return {"status": "error", "error": "Failed to query notebook"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_delete(
notebook_id: str,
confirm: bool = False,
) -> dict[str, Any]:
"""Delete notebook permanently. IRREVERSIBLE. Requires confirm=True.
Args:
notebook_id: Notebook UUID
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "error",
"error": "Deletion not confirmed. You must ask the user to confirm "
"before deleting. Set confirm=True only after user approval.",
"warning": "This action is IRREVERSIBLE. The notebook and all its "
"sources will be permanently deleted.",
}
try:
client = get_client()
result = client.delete_notebook(notebook_id)
if result:
return {
"status": "success",
"message": f"Notebook {notebook_id} has been permanently deleted.",
}
return {"status": "error", "error": "Failed to delete notebook"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def notebook_rename(
notebook_id: str,
new_title: str,
) -> dict[str, Any]:
"""Rename a notebook.
Args:
notebook_id: Notebook UUID
new_title: New title
"""
try:
client = get_client()
result = client.rename_notebook(notebook_id, new_title)
if result:
return {
"status": "success",
"notebook": {
"id": notebook_id,
"title": new_title,
},
}
return {"status": "error", "error": "Failed to rename notebook"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def chat_configure(
notebook_id: str,
goal: str = "default",
custom_prompt: str | None = None,
response_length: str = "default",
) -> dict[str, Any]:
"""Configure notebook chat settings.
Args:
notebook_id: Notebook UUID
goal: default|learning_guide|custom
custom_prompt: Required when goal=custom (max 10000 chars)
response_length: default|longer|shorter
"""
try:
client = get_client()
result = client.configure_chat(
notebook_id=notebook_id,
goal=goal,
custom_prompt=custom_prompt,
response_length=response_length,
)
return result
except ValueError as e:
return {"status": "error", "error": str(e)}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def source_list_drive(notebook_id: str) -> dict[str, Any]:
"""List sources with types and Drive freshness status.
Use before source_sync_drive to identify stale sources.
Args:
notebook_id: Notebook UUID
"""
try:
client = get_client()
sources = client.get_notebook_sources_with_types(notebook_id)
# Separate sources by syncability
syncable_sources = []
other_sources = []
for src in sources:
if src.get("can_sync"):
# Check freshness for syncable sources (Drive docs and Gemini Notes)
is_fresh = client.check_source_freshness(src["id"])
src["is_fresh"] = is_fresh
src["needs_sync"] = is_fresh is False
syncable_sources.append(src)
else:
other_sources.append(src)
# Count stale sources
stale_count = sum(1 for s in syncable_sources if s.get("needs_sync"))
return {
"status": "success",
"notebook_id": notebook_id,
"summary": {
"total_sources": len(sources),
"syncable_sources": len(syncable_sources),
"stale_sources": stale_count,
"other_sources": len(other_sources),
},
"syncable_sources": syncable_sources,
"other_sources": [
{
"id": s["id"],
"title": s["title"],
"type": s["source_type_name"],
"url": s.get("url"),
}
for s in other_sources
],
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def source_sync_drive(
source_ids: list[str],
confirm: bool = False,
) -> dict[str, Any]:
"""Sync Drive sources with latest content. Requires confirm=True.
Call source_list_drive first to identify stale sources.
Args:
source_ids: Source UUIDs to sync
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "error",
"error": "Sync not confirmed. You must ask the user to confirm "
"before syncing. Set confirm=True only after user approval.",
"hint": "First call source_list_drive to show stale sources, "
"then ask user to confirm before syncing.",
}
if not source_ids:
return {
"status": "error",
"error": "No source_ids provided. Use source_list_drive to get source IDs.",
}
try:
client = get_client()
results = []
synced_count = 0
failed_count = 0
for source_id in source_ids:
try:
result = client.sync_drive_source(source_id)
if result:
results.append({
"source_id": source_id,
"status": "synced",
"title": result.get("title"),
})
synced_count += 1
else:
results.append({
"source_id": source_id,
"status": "failed",
"error": "Sync returned no result",
})
failed_count += 1
except Exception as e:
results.append({
"source_id": source_id,
"status": "failed",
"error": str(e),
})
failed_count += 1
return {
"status": "success" if failed_count == 0 else "partial",
"summary": {
"total": len(source_ids),
"synced": synced_count,
"failed": failed_count,
},
"results": results,
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def source_delete(
source_id: str,
confirm: bool = False,
) -> dict[str, Any]:
"""Delete source permanently. IRREVERSIBLE. Requires confirm=True.
Args:
source_id: Source UUID to delete
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "error",
"error": "Deletion not confirmed. You must ask the user to confirm "
"before deleting. Set confirm=True only after user approval.",
"warning": "This action is IRREVERSIBLE. The source will be "
"permanently deleted from the notebook.",
}
try:
client = get_client()
result = client.delete_source(source_id)
if result:
return {
"status": "success",
"message": f"Source {source_id} has been permanently deleted.",
}
return {"status": "error", "error": "Failed to delete source"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def research_start(
query: str,
source: str = "web",
mode: str = "fast",
notebook_id: str | None = None,
title: str | None = None,
) -> dict[str, Any]:
"""Deep research / fast research: Search web or Google Drive to FIND NEW sources.
Use this for: "deep research on X", "find sources about Y", "search web for Z", "search Drive".
Workflow: research_start -> poll research_status -> research_import.
Args:
query: What to search for (e.g. "quantum computing advances")
source: web|drive (where to search)
mode: fast (~30s, ~10 sources) | deep (~5min, ~40 sources, web only)
notebook_id: Existing notebook (creates new if not provided)
title: Title for new notebook
"""
try:
client = get_client()
# Validate mode + source combination early
if mode.lower() == "deep" and source.lower() == "drive":
return {
"status": "error",
"error": "Deep Research only supports Web sources. Use mode='fast' for Drive.",
}
# Create notebook if needed
if not notebook_id:
notebook_title = title or f"Research: {query[:50]}"
notebook = client.create_notebook(title=notebook_title)
if not notebook:
return {"status": "error", "error": "Failed to create notebook"}
notebook_id = notebook.id
created_notebook = True
else:
created_notebook = False
# Start research
result = client.start_research(
notebook_id=notebook_id,
query=query,
source=source,
mode=mode,
)
if result:
response = {
"status": "success",
"task_id": result["task_id"],
"notebook_id": notebook_id,
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
"query": query,
"source": result["source"],
"mode": result["mode"],
"created_notebook": created_notebook,
}
# Add helpful message based on mode
if result["mode"] == "deep":
response["message"] = (
"Deep Research started. This takes 3-5 minutes. "
"Call research_status to check progress."
)
else:
response["message"] = (
"Fast Research started. This takes about 30 seconds. "
"Call research_status to check progress."
)
return response
return {"status": "error", "error": "Failed to start research"}
except ValueError as e:
return {"status": "error", "error": str(e)}
except Exception as e:
return {"status": "error", "error": str(e)}
def _compact_research_result(result: dict) -> dict:
"""Compact research result to save tokens.
Truncates report to 500 chars and limits sources to first 10.
Users can query the notebook for full details.
"""
if not isinstance(result, dict):
return result
# Truncate report if present
if "report" in result and result["report"]:
report = result["report"]
if len(report) > 500:
result["report"] = report[:500] + f"\n\n... (truncated {len(report) - 500} characters. Query the notebook for full details)"
# Limit sources shown
if "sources" in result and isinstance(result["sources"], list):
total_sources = len(result["sources"])
if total_sources > 10:
result["sources"] = result["sources"][:10]
result["sources_truncated"] = f"Showing first 10 of {total_sources} sources. Set compact=False for all sources."
return result
@logged_tool()
def research_status(
notebook_id: str,
poll_interval: int = 30,
max_wait: int = 300,
compact: bool = True,
task_id: str | None = None,
) -> dict[str, Any]:
"""Poll research progress. Blocks until complete or timeout.
Args:
notebook_id: Notebook UUID
poll_interval: Seconds between polls (default: 30)
max_wait: Max seconds to wait (default: 300, 0=single poll)
compact: If True (default), truncate report and limit sources shown to save tokens.
Use compact=False to get full details.
task_id: Optional Task ID to poll for a specific research task.
"""
import time
try:
client = get_client()
start_time = time.time()
polls = 0
while True:
polls += 1
result = client.poll_research(notebook_id, target_task_id=task_id)
if not result:
# If specific task requested but not found, keep waiting (it might appear)
if task_id:
time.sleep(poll_interval)
continue
return {"status": "error", "error": "Failed to poll research status"}
# If completed or no research found, return immediately
if result.get("status") in ("completed", "no_research"):
result["polls_made"] = polls
result["wait_time_seconds"] = round(time.time() - start_time, 1)
# Compact mode: truncate to save tokens
if compact and result.get("status") == "completed":
result = _compact_research_result(result)
return {
"status": "success",
"research": result,
}
# Check if we should stop waiting
elapsed = time.time() - start_time
if max_wait == 0 or elapsed >= max_wait:
result["polls_made"] = polls
result["wait_time_seconds"] = round(elapsed, 1)
result["message"] = (
f"Research still in progress after {round(elapsed, 1)}s. "
f"Call research_status again to continue waiting."
)
# Compact mode even for in-progress
if compact:
result = _compact_research_result(result)
return {
"status": "success",
"research": result,
}
# Wait before next poll
time.sleep(poll_interval)
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def research_import(
notebook_id: str,
task_id: str,
source_indices: list[int] | None = None,
) -> dict[str, Any]:
"""Import discovered sources into notebook.
Call after research_status shows status="completed".
Args:
notebook_id: Notebook UUID
task_id: Research task ID
source_indices: Source indices to import (default: all)
"""
try:
client = get_client()
# First, get the current research results to get source details
poll_result = client.poll_research(notebook_id, target_task_id=task_id)
if not poll_result or poll_result.get("status") == "no_research":
return {
"status": "error",
"error": "No research found for this notebook. Run research_start first.",
}
if poll_result.get("status") != "completed":
return {
"status": "error",
"error": f"Research is still in progress (status: {poll_result.get('status')}). "
"Wait for completion before importing.",
}
# Get sources from poll result
all_sources = poll_result.get("sources", [])
report_content = poll_result.get("report", "")
if not all_sources:
return {
"status": "error",
"error": "No sources found in research results.",
}
# Separate deep_report sources (type 5) from importable web/drive sources
# Deep reports will be imported as text sources, web sources imported normally
deep_report_source = None
web_sources = []
for src in all_sources:
if src.get("result_type") == 5:
deep_report_source = src
else:
web_sources.append(src)
# Filter sources by indices if specified
if source_indices is not None:
sources_to_import = []
invalid_indices = []
for idx in source_indices:
if 0 <= idx < len(all_sources):
sources_to_import.append(all_sources[idx])
else:
invalid_indices.append(idx)
if invalid_indices:
return {
"status": "error",
"error": f"Invalid source indices: {invalid_indices}. "
f"Valid range is 0-{len(all_sources)-1}.",
}
else:
sources_to_import = all_sources
# Import web/drive sources (skip deep_report sources as they don't have URLs)
web_sources_to_import = [s for s in sources_to_import if s.get("result_type") != 5]
imported = client.import_research_sources(
notebook_id=notebook_id,
task_id=task_id,
sources=web_sources_to_import,
)
# If deep research with report, import the report as a text source
if deep_report_source and report_content:
try:
report_result = client.add_text_source(
notebook_id=notebook_id,
title=deep_report_source.get("title", "Deep Research Report"),
text=report_content,
)
if report_result:
imported.append({
"id": report_result.get("id"),
"title": report_result.get("title", "Deep Research Report"),
})
except Exception as e:
# Don't fail the entire import if report import fails
pass
return {
"status": "success",
"imported_count": len(imported),
"total_available": len(all_sources),
"sources": imported,
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def audio_overview_create(
notebook_id: str,
source_ids: list[str] | None = None,
format: str = "deep_dive",
length: str = "default",
language: str = "en",
focus_prompt: str = "",
confirm: bool = False,
) -> dict[str, Any]:
"""Generate audio overview. Requires confirm=True after user approval.
Args:
notebook_id: Notebook UUID
source_ids: Source IDs (default: all)
format: deep_dive|brief|critique|debate
length: short|default|long
language: BCP-47 code (en, es, fr, de, ja)
focus_prompt: Optional focus text
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "pending_confirmation",
"message": "Please confirm these settings before creating the audio overview:",
"settings": {
"notebook_id": notebook_id,
"format": format,
"length": length,
"language": language,
"focus_prompt": focus_prompt or "(none)",
"source_ids": source_ids or "all sources",
},
"note": "Set confirm=True after user approves these settings.",
}
try:
client = get_client()
# Map format string to code
try:
format_code = constants.AUDIO_FORMATS.get_code(format)
except ValueError:
return {
"status": "error",
"error": f"Unknown format '{format}'. Use: {', '.join(constants.AUDIO_FORMATS.names)}",
}
# Map length string to code
try:
length_code = constants.AUDIO_LENGTHS.get_code(length)
except ValueError:
return {
"status": "error",
"error": f"Unknown length '{length}'. Use: {', '.join(constants.AUDIO_LENGTHS.names)}",
}
# Get source IDs if not provided
if source_ids is None:
sources = client.get_notebook_sources_with_types(notebook_id)
source_ids = [s["id"] for s in sources if s["id"]]
if not source_ids:
return {
"status": "error",
"error": "No sources found in notebook. Add sources before creating audio overview.",
}
result = client.create_audio_overview(
notebook_id=notebook_id,
source_ids=source_ids,
format_code=format_code,
length_code=length_code,
language=language,
focus_prompt=focus_prompt,
)
if result:
return {
"status": "success",
"artifact_id": result["artifact_id"],
"type": "audio",
"format": result["format"],
"length": result["length"],
"language": result["language"],
"generation_status": result["status"],
"message": "Audio generation started. Use studio_status to check progress.",
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
return {"status": "error", "error": "Failed to create audio overview"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def video_overview_create(
notebook_id: str,
source_ids: list[str] | None = None,
format: str = "explainer",
visual_style: str = "auto_select",
language: str = "en",
focus_prompt: str = "",
confirm: bool = False,
) -> dict[str, Any]:
"""Generate video overview. Requires confirm=True after user approval.
Args:
notebook_id: Notebook UUID
source_ids: Source IDs (default: all)
format: explainer|brief
visual_style: auto_select|classic|whiteboard|kawaii|anime|watercolor|retro_print|heritage|paper_craft
language: BCP-47 code (en, es, fr, de, ja)
focus_prompt: Optional focus text
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "pending_confirmation",
"message": "Please confirm these settings before creating the video overview:",
"settings": {
"notebook_id": notebook_id,
"format": format,
"visual_style": visual_style,
"language": language,
"focus_prompt": focus_prompt or "(none)",
"source_ids": source_ids or "all sources",
},
"note": "Set confirm=True after user approves these settings.",
}
try:
client = get_client()
# Map format string to code
try:
format_code = constants.VIDEO_FORMATS.get_code(format)
except ValueError:
return {
"status": "error",
"error": f"Unknown format '{format}'. Use: {', '.join(constants.VIDEO_FORMATS.names)}",
}
# Map style string to code
try:
style_code = constants.VIDEO_STYLES.get_code(visual_style)
except ValueError:
return {
"status": "error",
"error": f"Unknown visual_style '{visual_style}'. Use: {', '.join(constants.VIDEO_STYLES.names)}",
}
# Get source IDs if not provided
if source_ids is None:
sources = client.get_notebook_sources_with_types(notebook_id)
source_ids = [s["id"] for s in sources if s["id"]]
if not source_ids:
return {
"status": "error",
"error": "No sources found in notebook. Add sources before creating video overview.",
}
result = client.create_video_overview(
notebook_id=notebook_id,
source_ids=source_ids,
format_code=format_code,
visual_style_code=style_code,
language=language,
focus_prompt=focus_prompt,
)
if result:
return {
"status": "success",
"artifact_id": result["artifact_id"],
"type": "video",
"format": result["format"],
"visual_style": result["visual_style"],
"language": result["language"],
"generation_status": result["status"],
"message": "Video generation started. Use studio_status to check progress.",
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
return {"status": "error", "error": "Failed to create video overview"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def studio_status(notebook_id: str) -> dict[str, Any]:
"""Check studio content generation status and get URLs.
Args:
notebook_id: Notebook UUID
"""
try:
client = get_client()
artifacts = client.poll_studio_status(notebook_id)
# Also fetch mind maps and add them as artifacts
try:
mind_maps = client.list_mind_maps(notebook_id)
for mm in mind_maps:
artifacts.append({
"artifact_id": mm.get("mind_map_id"),
"type": "mind_map",
"title": mm.get("title", "Mind Map"),
"status": "completed",
"created_at": mm.get("created_at"),
})
except Exception:
# Don't fail studio_status if mind maps fail
pass
# Separate by status
completed = [a for a in artifacts if a.get("status") == "completed"]
in_progress = [a for a in artifacts if a.get("status") == "in_progress"]
return {
"status": "success",
"notebook_id": notebook_id,
"summary": {
"total": len(artifacts),
"completed": len(completed),
"in_progress": len(in_progress),
},
"artifacts": artifacts,
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def studio_delete(
notebook_id: str,
artifact_id: str,
confirm: bool = False,
) -> dict[str, Any]:
"""Delete studio artifact. IRREVERSIBLE. Requires confirm=True.
Args:
notebook_id: Notebook UUID
artifact_id: Artifact UUID (from studio_status)
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "error",
"error": "Deletion not confirmed. You must ask the user to confirm "
"before deleting. Set confirm=True only after user approval.",
"warning": "This action is IRREVERSIBLE. The artifact will be permanently deleted.",
"hint": "First call studio_status to list artifacts with their IDs and titles.",
}
try:
client = get_client()
result = client.delete_studio_artifact(artifact_id, notebook_id)
if result:
return {
"status": "success",
"message": f"Artifact {artifact_id} has been permanently deleted.",
"notebook_id": notebook_id,
}
return {"status": "error", "error": "Failed to delete artifact"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def infographic_create(
notebook_id: str,
source_ids: list[str] | None = None,
orientation: str = "landscape",
detail_level: str = "standard",
language: str = "en",
focus_prompt: str = "",
confirm: bool = False,
) -> dict[str, Any]:
"""Generate infographic. Requires confirm=True after user approval.
Args:
notebook_id: Notebook UUID
source_ids: Source IDs (default: all)
orientation: landscape|portrait|square
detail_level: concise|standard|detailed
language: BCP-47 code (en, es, fr, de, ja)
focus_prompt: Optional focus text
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "pending_confirmation",
"message": "Please confirm these settings before creating the infographic:",
"settings": {
"notebook_id": notebook_id,
"orientation": orientation,
"detail_level": detail_level,
"language": language,
"focus_prompt": focus_prompt or "(none)",
"source_ids": source_ids or "all sources",
},
"note": "Set confirm=True after user approves these settings.",
}
try:
client = get_client()
# Map orientation string to code
try:
orientation_code = constants.INFOGRAPHIC_ORIENTATIONS.get_code(orientation)
except ValueError:
return {
"status": "error",
"error": f"Unknown orientation '{orientation}'. Use: {', '.join(constants.INFOGRAPHIC_ORIENTATIONS.names)}",
}
# Map detail_level string to code
try:
detail_code = constants.INFOGRAPHIC_DETAILS.get_code(detail_level)
except ValueError:
return {
"status": "error",
"error": f"Unknown detail_level '{detail_level}'. Use: {', '.join(constants.INFOGRAPHIC_DETAILS.names)}",
}
# Get source IDs if not provided
if source_ids is None:
sources = client.get_notebook_sources_with_types(notebook_id)
source_ids = [s["id"] for s in sources if s["id"]]
if not source_ids:
return {
"status": "error",
"error": "No sources found in notebook. Add sources before creating infographic.",
}
result = client.create_infographic(
notebook_id=notebook_id,
source_ids=source_ids,
orientation_code=orientation_code,
detail_level_code=detail_code,
language=language,
focus_prompt=focus_prompt,
)
if result:
return {
"status": "success",
"artifact_id": result["artifact_id"],
"type": "infographic",
"orientation": result["orientation"],
"detail_level": result["detail_level"],
"language": result["language"],
"generation_status": result["status"],
"message": "Infographic generation started. Use studio_status to check progress.",
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
return {"status": "error", "error": "Failed to create infographic"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def slide_deck_create(
notebook_id: str,
source_ids: list[str] | None = None,
format: str = "detailed_deck",
length: str = "default",
language: str = "en",
focus_prompt: str = "",
confirm: bool = False,
) -> dict[str, Any]:
"""Generate slide deck. Requires confirm=True after user approval.
Args:
notebook_id: Notebook UUID
source_ids: Source IDs (default: all)
format: detailed_deck|presenter_slides
length: short|default
language: BCP-47 code (en, es, fr, de, ja)
focus_prompt: Optional focus text
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "pending_confirmation",
"message": "Please confirm these settings before creating the slide deck:",
"settings": {
"notebook_id": notebook_id,
"format": format,
"length": length,
"language": language,
"focus_prompt": focus_prompt or "(none)",
"source_ids": source_ids or "all sources",
},
"note": "Set confirm=True after user approves these settings.",
}
try:
client = get_client()
# Map format string to code
try:
format_code = constants.SLIDE_DECK_FORMATS.get_code(format)
except ValueError:
return {
"status": "error",
"error": f"Unknown format '{format}'. Use: {', '.join(constants.SLIDE_DECK_FORMATS.names)}",
}
# Map length string to code
try:
length_code = constants.SLIDE_DECK_LENGTHS.get_code(length)
except ValueError:
return {
"status": "error",
"error": f"Unknown length '{length}'. Use: {', '.join(constants.SLIDE_DECK_LENGTHS.names)}",
}
# Get source IDs if not provided
if source_ids is None:
sources = client.get_notebook_sources_with_types(notebook_id)
source_ids = [s["id"] for s in sources if s["id"]]
if not source_ids:
return {
"status": "error",
"error": "No sources found in notebook. Add sources before creating slide deck.",
}
result = client.create_slide_deck(
notebook_id=notebook_id,
source_ids=source_ids,
format_code=format_code,
length_code=length_code,
language=language,
focus_prompt=focus_prompt,
)
if result:
return {
"status": "success",
"artifact_id": result["artifact_id"],
"type": "slide_deck",
"format": result["format"],
"length": result["length"],
"language": result["language"],
"generation_status": result["status"],
"message": "Slide deck generation started. Use studio_status to check progress.",
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
return {"status": "error", "error": "Failed to create slide deck"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def report_create(
notebook_id: str,
source_ids: list[str] | None = None,
report_format: str = "Briefing Doc",
custom_prompt: str = "",
language: str = "en",
confirm: bool = False,
) -> dict[str, Any]:
"""Generate report. Requires confirm=True after user approval.
Args:
notebook_id: Notebook UUID
source_ids: Source IDs (default: all)
report_format: "Briefing Doc"|"Study Guide"|"Blog Post"|"Create Your Own"
custom_prompt: Required for "Create Your Own"
language: BCP-47 code (en, es, fr, de, ja)
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "pending_confirmation",
"message": "Please confirm these settings before creating the report:",
"settings": {
"notebook_id": notebook_id,
"report_format": report_format,
"language": language,
"custom_prompt": custom_prompt or "(none)",
"source_ids": source_ids or "all sources",
},
"note": "Set confirm=True after user approves these settings.",
}
try:
client = get_client()
# Get source IDs if not provided
if not source_ids:
sources = client.get_notebook_sources_with_types(notebook_id)
source_ids = [s["id"] for s in sources if s.get("id")]
result = client.create_report(
notebook_id=notebook_id,
source_ids=source_ids,
report_format=report_format,
custom_prompt=custom_prompt,
language=language,
)
if result:
return {
"status": "success",
"artifact_id": result["artifact_id"],
"type": "report",
"format": result["format"],
"language": result["language"],
"generation_status": result["status"],
"message": "Report generation started. Use studio_status to check progress.",
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
return {"status": "error", "error": "Failed to create report"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def flashcards_create(
notebook_id: str,
source_ids: list[str] | None = None,
difficulty: str = "medium",
confirm: bool = False,
) -> dict[str, Any]:
"""Generate flashcards. Requires confirm=True after user approval.
Args:
notebook_id: Notebook UUID
source_ids: Source IDs (default: all)
difficulty: easy|medium|hard
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "pending_confirmation",
"message": "Please confirm these settings before creating flashcards:",
"settings": {
"notebook_id": notebook_id,
"difficulty": difficulty,
"source_ids": source_ids or "all sources",
},
"note": "Set confirm=True after user approves these settings.",
}
try:
client = get_client()
# Map difficulty to code
try:
difficulty_code = constants.FLASHCARD_DIFFICULTIES.get_code(difficulty)
except ValueError:
return {
"status": "error",
"error": f"Unknown difficulty '{difficulty}'. Use: {', '.join(constants.FLASHCARD_DIFFICULTIES.names)}",
}
# Get source IDs if not provided
if not source_ids:
sources = client.get_notebook_sources_with_types(notebook_id)
source_ids = [s["id"] for s in sources if s.get("id")]
result = client.create_flashcards(
notebook_id=notebook_id,
source_ids=source_ids,
difficulty_code=difficulty_code,
)
if result:
return {
"status": "success",
"artifact_id": result["artifact_id"],
"type": "flashcards",
"difficulty": result["difficulty"],
"generation_status": result["status"],
"message": "Flashcards generation started. Use studio_status to check progress.",
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
return {"status": "error", "error": "Failed to create flashcards"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def quiz_create(
notebook_id: str,
source_ids: list[str] | None = None,
question_count: int = 2,
difficulty: str = "medium",
confirm: bool = False,
) -> dict[str, Any]:
"""Generate quiz. Requires confirm=True after user approval.
Args:
notebook_id: Notebook UUID
source_ids: Source IDs (default: all)
question_count: Number of questions (default: 2)
difficulty: Difficulty level (default: medium)
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "pending_confirmation",
"message": "Please confirm these settings before creating quiz:",
"settings": {
"notebook_id": notebook_id,
"question_count": question_count,
"difficulty": difficulty,
"source_ids": source_ids or "all sources",
},
"note": "Set confirm=True after user approves these settings.",
}
try:
client = get_client()
# Map difficulty to code
try:
difficulty_code = constants.FLASHCARD_DIFFICULTIES.get_code(difficulty)
except ValueError:
return {
"status": "error",
"error": f"Unknown difficulty '{difficulty}'. Use: {', '.join(constants.FLASHCARD_DIFFICULTIES.names)}",
}
if not source_ids:
sources = client.get_notebook_sources_with_types(notebook_id)
source_ids = [s["id"] for s in sources if s.get("id")]
result = client.create_quiz(
notebook_id=notebook_id,
source_ids=source_ids,
question_count=question_count,
difficulty=difficulty_code,
)
if result:
return {
"status": "success",
"artifact_id": result["artifact_id"],
"type": "quiz",
"question_count": result["question_count"],
"difficulty": result["difficulty"],
"generation_status": result["status"],
"message": "Quiz generation started. Use studio_status to check progress.",
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
return {"status": "error", "error": "Failed to create quiz"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def data_table_create(
notebook_id: str,
description: str,
source_ids: list[str] | None = None,
language: str = "en",
confirm: bool = False,
) -> dict[str, Any]:
"""Generate data table. Requires confirm=True after user approval.
Args:
notebook_id: Notebook UUID
description: Description of the data table to create
source_ids: Source IDs (default: all)
language: Language code (default: "en")
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "pending_confirmation",
"message": "Please confirm these settings before creating data table:",
"settings": {
"notebook_id": notebook_id,
"description": description,
"language": language,
"source_ids": source_ids or "all sources",
},
"note": "Set confirm=True after user approves these settings.",
}
try:
client = get_client()
if not source_ids:
sources = client.get_notebook_sources_with_types(notebook_id)
source_ids = [s["id"] for s in sources if s.get("id")]
result = client.create_data_table(
notebook_id=notebook_id,
source_ids=source_ids,
description=description,
language=language,
)
if result:
return {
"status": "success",
"artifact_id": result["artifact_id"],
"type": "data_table",
"description": result["description"],
"generation_status": result["status"],
"message": "Data table generation started. Use studio_status to check progress.",
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
return {"status": "error", "error": "Failed to create data table"}
except Exception as e:
return {"status": "error", "error": str(e)}
@logged_tool()
def mind_map_create(
notebook_id: str,
source_ids: list[str] | None = None,
title: str = "Mind Map",
confirm: bool = False,
) -> dict[str, Any]:
"""Generate and save mind map. Requires confirm=True after user approval.
Args:
notebook_id: Notebook UUID
source_ids: Source IDs (default: all)
title: Display title
confirm: Must be True after user approval
"""
if not confirm:
return {
"status": "pending_confirmation",
"message": "Please confirm these settings before creating the mind map:",
"settings": {
"notebook_id": notebook_id,
"title": title,
"source_ids": source_ids or "all sources",
},
"note": "Set confirm=True after user approves these settings.",
}
try:
client = get_client()
# Get source IDs if not provided
if not source_ids:
sources = client.get_notebook_sources_with_types(notebook_id)
source_ids = [s["id"] for s in sources if s.get("id")]
# Step 1: Generate the mind map
gen_result = client.generate_mind_map(source_ids=source_ids)
if not gen_result or not gen_result.get("mind_map_json"):
return {"status": "error", "error": "Failed to generate mind map"}
# Step 2: Save the mind map to the notebook
save_result = client.save_mind_map(
notebook_id=notebook_id,
mind_map_json=gen_result["mind_map_json"],
source_ids=source_ids,
title=title,
)
if save_result:
# Parse the JSON to get structure info
import json
try:
mind_map_data = json.loads(save_result.get("mind_map_json", "{}"))
root_name = mind_map_data.get("name", "Unknown")
children_count = len(mind_map_data.get("children", []))
except json.JSONDecodeError:
root_name = "Unknown"
children_count = 0
return {
"status": "success",
"mind_map_id": save_result["mind_map_id"],
"notebook_id": notebook_id,
"title": save_result.get("title", title),
"root_name": root_name,
"children_count": children_count,
"message": "Mind map created and saved successfully.",
"notebook_url": f"https://notebooklm.google.com/notebook/{notebook_id}",
}
return {"status": "error", "error": "Failed to save mind map"}
except Exception as e:
return {"status": "error", "error": str(e)}
# Essential cookies for NotebookLM API authentication
# Only these are needed - no need to save all 20+ cookies from the browser
ESSENTIAL_COOKIES = [
"SID", "HSID", "SSID", "APISID", "SAPISID", # Core auth cookies
"__Secure-1PSID", "__Secure-3PSID", # Secure session variants
"__Secure-1PAPISID", "__Secure-3PAPISID", # Secure API variants
"OSID", "__Secure-OSID", # Origin-bound session
"__Secure-1PSIDTS", "__Secure-3PSIDTS", # Timestamp tokens (rotate frequently)
"SIDCC", "__Secure-1PSIDCC", "__Secure-3PSIDCC", # Session cookies (rotate frequently)
]
@logged_tool()
def save_auth_tokens(
cookies: str,
csrf_token: str = "",
session_id: str = "",
request_body: str = "",
request_url: str = "",
) -> dict[str, Any]:
"""Save NotebookLM cookies (FALLBACK method - try notebooklm-mcp-auth first!).
IMPORTANT FOR AI ASSISTANTS:
- First, run `notebooklm-mcp-auth` via Bash/terminal (automated, preferred)
- Only use this tool if the automated CLI fails
Args:
cookies: Cookie header from Chrome DevTools (only needed if CLI fails)
csrf_token: Deprecated - auto-extracted
session_id: Deprecated - auto-extracted
request_body: Optional - contains CSRF if extracting manually
request_url: Optional - contains session ID if extracting manually
"""
global _client
try:
import time
import urllib.parse
from .auth import AuthTokens, save_tokens_to_cache
# Parse cookie string to dict
all_cookies = {}
for part in cookies.split("; "):
if "=" in part:
key, value = part.split("=", 1)
all_cookies[key] = value
# Validate required cookies
required = ["SID", "HSID", "SSID", "APISID", "SAPISID"]
missing = [c for c in required if c not in all_cookies]
if missing:
return {
"status": "error",
"error": f"Missing required cookies: {missing}",
}
# Filter to only essential cookies (reduces noise significantly)
cookie_dict = {k: v for k, v in all_cookies.items() if k in ESSENTIAL_COOKIES}
# Try to extract CSRF token from request body if provided
if not csrf_token and request_body:
# Request body format: f.req=...&at=<csrf_token>&
if "at=" in request_body:
# Extract and URL-decode the CSRF token
at_part = request_body.split("at=")[1].split("&")[0]
csrf_token = urllib.parse.unquote(at_part)
# Try to extract session ID from request URL if provided
if not session_id and request_url:
# URL format: ...?f.sid=<session_id>&...
if "f.sid=" in request_url:
sid_part = request_url.split("f.sid=")[1].split("&")[0]
session_id = urllib.parse.unquote(sid_part)
# Create and save tokens
# Note: csrf_token and session_id will be auto-extracted from page on first use if still empty
tokens = AuthTokens(
cookies=cookie_dict,
csrf_token=csrf_token, # May be empty - will be auto-extracted from page
session_id=session_id, # May be empty - will be auto-extracted from page
extracted_at=time.time(),
)
save_tokens_to_cache(tokens)
# Reset client so next call uses fresh tokens
_client = None
from .auth import get_cache_path
# Build status message
if csrf_token and session_id:
token_msg = "CSRF token and session ID extracted from network request - no page fetch needed! ⚡"
elif csrf_token:
token_msg = "CSRF token extracted from network request. Session ID will be auto-extracted on first use."
elif session_id:
token_msg = "Session ID extracted from network request. CSRF token will be auto-extracted on first use."
else:
token_msg = "CSRF token and session ID will be auto-extracted on first API call (~1-2s one-time delay)."
return {
"status": "success",
"message": f"Saved {len(cookie_dict)} essential cookies (filtered from {len(all_cookies)}). {token_msg}",
"cache_path": str(get_cache_path()),
"extracted_csrf": bool(csrf_token),
"extracted_session_id": bool(session_id),
}
except Exception as e:
return {"status": "error", "error": str(e)}
def main():
"""Run the MCP server.
Supports multiple transports:
- stdio (default): For desktop apps like Claude Desktop
- http: Streamable HTTP for network access
- sse: Legacy SSE transport (backwards compatibility)
Configuration via CLI args or environment variables.
"""
parser = argparse.ArgumentParser(
description="NotebookLM MCP Server",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Environment Variables:
NOTEBOOKLM_MCP_TRANSPORT Transport type (stdio, http, sse)
NOTEBOOKLM_MCP_HOST Host to bind (default: 127.0.0.1)
NOTEBOOKLM_MCP_PORT Port to listen on (default: 8000)
NOTEBOOKLM_MCP_PATH MCP endpoint path (default: /mcp)
NOTEBOOKLM_MCP_STATELESS Enable stateless mode for scaling (true/false)
NOTEBOOKLM_MCP_DEBUG Enable debug logging for MCP + API traffic (true/false)
NOTEBOOKLM_QUERY_TIMEOUT Query timeout in seconds (default: 120.0)
Examples:
notebooklm-mcp # Default stdio transport
notebooklm-mcp --transport http # HTTP on localhost:8000
notebooklm-mcp --transport http --port 3000 # HTTP on custom port
notebooklm-mcp --transport http --host 0.0.0.0 # Bind to all interfaces
notebooklm-mcp --debug # Log MCP calls + NotebookLM API traffic
notebooklm-mcp --query-timeout 180 # Set query timeout to 180 seconds
"""
)
parser.add_argument(
"--transport", "-t",
choices=["stdio", "http", "sse"],
default=os.environ.get("NOTEBOOKLM_MCP_TRANSPORT", "stdio"),
help="Transport protocol (default: stdio)"
)
parser.add_argument(
"--host", "-H",
default=os.environ.get("NOTEBOOKLM_MCP_HOST", "127.0.0.1"),
help="Host to bind for HTTP/SSE (default: 127.0.0.1)"
)
parser.add_argument(
"--port", "-p",
type=int,
default=int(os.environ.get("NOTEBOOKLM_MCP_PORT", "8000")),
help="Port for HTTP/SSE transport (default: 8000)"
)
parser.add_argument(
"--path",
default=os.environ.get("NOTEBOOKLM_MCP_PATH", "/mcp"),
help="MCP endpoint path for HTTP (default: /mcp)"
)
parser.add_argument(
"--stateless",
action="store_true",
default=os.environ.get("NOTEBOOKLM_MCP_STATELESS", "").lower() == "true",
help="Enable stateless mode for horizontal scaling"
)
parser.add_argument(
"--debug",
action="store_true",
default=os.environ.get("NOTEBOOKLM_MCP_DEBUG", "").lower() == "true",
help="Enable debug logging (MCP tool calls + NotebookLM API requests/responses)"
)
parser.add_argument(
"--query-timeout",
type=float,
default=float(os.environ.get("NOTEBOOKLM_QUERY_TIMEOUT", "120.0")),
help="Query timeout in seconds (default: 120.0)"
)
args = parser.parse_args()
# Update global query timeout from CLI args
global _query_timeout
_query_timeout = args.query_timeout
# Configure logging
if args.debug:
logging.basicConfig(
level=logging.WARNING, # Suppress most logs
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Shared handler and formatter for debug loggers
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
# Enable MCP request/response logging
mcp_logger.setLevel(logging.DEBUG)
mcp_logger.addHandler(handler)
# Enable API request/response logging (between MCP server and NotebookLM API)
api_logger = logging.getLogger("notebooklm_mcp.api")
api_logger.setLevel(logging.DEBUG)
api_logger.addHandler(handler)
print("Debug logging: ENABLED (MCP tool calls + NotebookLM API requests/responses)")
if args.transport == "http":
print(f"Starting NotebookLM MCP server (HTTP) on http://{args.host}:{args.port}{args.path}")
print(f"Health check: http://{args.host}:{args.port}/health")
if args.stateless:
print("Stateless mode: ENABLED (suitable for horizontal scaling)")
mcp.run(
transport="http",
host=args.host,
port=args.port,
path=args.path,
stateless_http=args.stateless,
)
elif args.transport == "sse":
print(f"Starting NotebookLM MCP server (SSE) on http://{args.host}:{args.port}/sse")
print(f"Health check: http://{args.host}:{args.port}/health")
if args.stateless:
print("Stateless mode: ENABLED (suitable for horizontal scaling)")
mcp.run(
transport="sse",
host=args.host,
port=args.port,
stateless_http=args.stateless,
)
else:
# Default: stdio transport (no message - stdio should be silent)
mcp.run()
return 0
if __name__ == "__main__":
exit(main())