"""
Zotero MCP server implementation.
"""
from typing import Any, Dict, List, Literal, Optional, Union
import os
import uuid
import tempfile
from fastmcp import Context, FastMCP
from zotero_mcp.client import (
AttachmentDetails,
convert_to_markdown,
format_item_metadata,
generate_bibtex,
get_attachment_details,
get_zotero_client,
)
from zotero_mcp.utils import format_creators
# Create an MCP server with appropriate dependencies
mcp = FastMCP(
"Zotero",
dependencies=["pyzotero", "mcp[cli]", "python-dotenv", "markitdown", "fastmcp"],
)
@mcp.tool(
name="zotero_search_items",
description="Search for items in your Zotero library, given a query string."
)
def search_items(
query: str,
qmode: Literal["titleCreatorYear", "everything"] = "titleCreatorYear",
item_type: str = "-attachment", # Exclude attachments by default
limit: Optional[int] = 10,
tag: Optional[List[str]] = None,
*,
ctx: Context
) -> str:
"""
Search for items in your Zotero library.
Args:
query: Search query string
qmode: Query mode (titleCreatorYear or everything)
item_type: Type of items to search for. Use "-attachment" to exclude attachments.
limit: Maximum number of results to return
tag: List of tags conditions to filter by
ctx: MCP context
Returns:
Markdown-formatted search results
"""
try:
if not query.strip():
return "Error: Search query cannot be empty"
tag_condition_str = ""
if tag:
tag_condition_str = f" with tags: '{', '.join(tag)}'"
else :
tag = []
ctx.info(f"Searching Zotero for '{query}'{tag_condition_str}")
zot = get_zotero_client()
# Search using the query parameters
zot.add_parameters(q=query, qmode=qmode, itemType=item_type, limit=limit, tag=tag)
results = zot.items()
if not results:
return f"No items found matching query: '{query}'{tag_condition_str}"
# Format results as markdown
output = [f"# Search Results for '{query}'", f"{tag_condition_str}", ""]
for i, item in enumerate(results, 1):
data = item.get("data", {})
title = data.get("title", "Untitled")
item_type = data.get("itemType", "unknown")
date = data.get("date", "No date")
key = item.get("key", "")
# Format creators
creators = data.get("creators", [])
creators_str = format_creators(creators)
# Build the formatted entry
output.append(f"## {i}. {title}")
output.append(f"**Type:** {item_type}")
output.append(f"**Item Key:** {key}")
output.append(f"**Date:** {date}")
output.append(f"**Authors:** {creators_str}")
# Add abstract snippet if present
if abstract := data.get("abstractNote"):
# Limit abstract length for search results
abstract_snippet = abstract[:200] + "..." if len(abstract) > 200 else abstract
output.append(f"**Abstract:** {abstract_snippet}")
# Add tags if present
if tags := data.get("tags"):
tag_list = [f"`{tag['tag']}`" for tag in tags]
if tag_list:
output.append(f"**Tags:** {' '.join(tag_list)}")
output.append("") # Empty line between items
return "\n".join(output)
except Exception as e:
ctx.error(f"Error searching Zotero: {str(e)}")
return f"Error searching Zotero: {str(e)}"
@mcp.tool(
name="zotero_search_by_tag",
description="Search for items in your Zotero library by tag. " \
"Conditions are ANDed, each term supports disjunction`||` and exclusion`-`."
)
def search_by_tag(
tag: List[str],
item_type: str = "-attachment",
limit: Optional[int] = 10,
*,
ctx: Context
) -> str:
"""
Search for items in your Zotero library by tag。
Conditions are ANDed, each term supports disjunction`||` and exclusion`-`.
Args:
tag: List of tag conditions. Items are returned only if they satisfy
ALL conditions in the list. Each tag condition can be expressed
in two ways:
As alternatives: tag1 || tag2 (matches items with either tag1 OR tag2)
As exclusions: -tag (matches items that do NOT have this tag)
For example, a tag field with ["research || important", "-draft"] would
return items that:
Have either "research" OR "important" tags, AND
Do NOT have the "draft" tag
item_type: Type of items to search for. Use "-attachment" to exclude attachments.
limit: Maximum number of results to return
ctx: MCP context
Returns:
Markdown-formatted search results
"""
try:
if not tag:
return "Error: Tag cannot be empty"
ctx.info(f"Searching Zotero for tag '{tag}'")
zot = get_zotero_client()
# Search using the query parameters
zot.add_parameters(q="", tag=tag, itemType=item_type, limit=limit)
results = zot.items()
if not results:
return f"No items found with tag: '{tag}'"
# Format results as markdown
output = [f"# Search Results for Tag: '{tag}'", ""]
for i, item in enumerate(results, 1):
data = item.get("data", {})
title = data.get("title", "Untitled")
item_type = data.get("itemType", "unknown")
date = data.get("date", "No date")
key = item.get("key", "")
# Format creators
creators = data.get("creators", [])
creators_str = format_creators(creators)
# Build the formatted entry
output.append(f"## {i}. {title}")
output.append(f"**Type:** {item_type}")
output.append(f"**Item Key:** {key}")
output.append(f"**Date:** {date}")
output.append(f"**Authors:** {creators_str}")
# Add abstract snippet if present
if abstract := data.get("abstractNote"):
# Limit abstract length for search results
abstract_snippet = abstract[:200] + "..." if len(abstract) > 200 else abstract
output.append(f"**Abstract:** {abstract_snippet}")
# Add tags if present
if tags := data.get("tags"):
tag_list = [f"`{tag['tag']}`" for tag in tags]
if tag_list:
output.append(f"**Tags:** {' '.join(tag_list)}")
output.append("") # Empty line between items
return "\n".join(output)
except Exception as e:
ctx.error(f"Error searching Zotero: {str(e)}")
return f"Error searching Zotero: {str(e)}"
@mcp.tool(
name="zotero_get_item_metadata",
description="Get detailed metadata for a specific Zotero item by its key."
)
def get_item_metadata(
item_key: str,
include_abstract: bool = True,
format: Literal["markdown", "bibtex"] = "markdown",
*,
ctx: Context
) -> str:
"""
Get detailed metadata for a Zotero item.
Args:
item_key: Zotero item key/ID
include_abstract: Whether to include the abstract in the output (markdown format only)
format: Output format - 'markdown' for detailed metadata or 'bibtex' for BibTeX citation
ctx: MCP context
Returns:
Formatted item metadata (markdown or BibTeX)
"""
try:
ctx.info(f"Fetching metadata for item {item_key} in {format} format")
zot = get_zotero_client()
item = zot.item(item_key)
if not item:
return f"No item found with key: {item_key}"
if format == "bibtex":
return generate_bibtex(item)
else:
return format_item_metadata(item, include_abstract)
except Exception as e:
ctx.error(f"Error fetching item metadata: {str(e)}")
return f"Error fetching item metadata: {str(e)}"
@mcp.tool(
name="zotero_get_item_fulltext",
description="Get the full text content of a Zotero item by its key."
)
def get_item_fulltext(
item_key: str,
*,
ctx: Context
) -> str:
"""
Get the full text content of a Zotero item.
Args:
item_key: Zotero item key/ID
ctx: MCP context
Returns:
Markdown-formatted item full text
"""
try:
ctx.info(f"Fetching full text for item {item_key}")
zot = get_zotero_client()
# First get the item metadata
item = zot.item(item_key)
if not item:
return f"No item found with key: {item_key}"
# Get item metadata in markdown format
metadata = format_item_metadata(item, include_abstract=True)
# Try to get attachment details
attachment = get_attachment_details(zot, item)
if not attachment:
return f"{metadata}\n\n---\n\nNo suitable attachment found for this item."
ctx.info(f"Found attachment: {attachment.key} ({attachment.content_type})")
# Try fetching full text from Zotero's full text index first
try:
full_text_data = zot.fulltext_item(attachment.key)
if full_text_data and "content" in full_text_data and full_text_data["content"]:
ctx.info("Successfully retrieved full text from Zotero's index")
return f"{metadata}\n\n---\n\n## Full Text\n\n{full_text_data['content']}"
except Exception as fulltext_error:
ctx.info(f"Couldn't retrieve indexed full text: {str(fulltext_error)}")
# If we couldn't get indexed full text, try to download and convert the file
try:
ctx.info(f"Attempting to download and convert attachment {attachment.key}")
# Download the file to a temporary location
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
file_path = os.path.join(tmpdir, attachment.filename or f"{attachment.key}.pdf")
zot.dump(attachment.key, filename=os.path.basename(file_path), path=tmpdir)
if os.path.exists(file_path):
ctx.info(f"Downloaded file to {file_path}, converting to markdown")
converted_text = convert_to_markdown(file_path)
return f"{metadata}\n\n---\n\n## Full Text\n\n{converted_text}"
else:
return f"{metadata}\n\n---\n\nFile download failed."
except Exception as download_error:
ctx.error(f"Error downloading/converting file: {str(download_error)}")
return f"{metadata}\n\n---\n\nError accessing attachment: {str(download_error)}"
except Exception as e:
ctx.error(f"Error fetching item full text: {str(e)}")
return f"Error fetching item full text: {str(e)}"
@mcp.tool(
name="zotero_get_collections",
description="List all collections in your Zotero library."
)
def get_collections(
limit: Optional[int] = None,
*,
ctx: Context
) -> str:
"""
List all collections in your Zotero library.
Args:
limit: Maximum number of collections to return
ctx: MCP context
Returns:
Markdown-formatted list of collections
"""
try:
ctx.info("Fetching collections")
zot = get_zotero_client()
collections = zot.collections(limit=limit)
# Always return the header, even if empty
output = ["# Zotero Collections", ""]
if not collections:
output.append("No collections found in your Zotero library.")
return "\n".join(output)
# Create a mapping of collection IDs to their data
collection_map = {c["key"]: c for c in collections}
# Create a mapping of parent to child collections
# Only add entries for collections that actually exist
hierarchy = {}
for coll in collections:
parent_key = coll["data"].get("parentCollection")
# Handle various representations of "no parent"
if parent_key in ["", None] or not parent_key:
parent_key = None # Normalize to None
if parent_key not in hierarchy:
hierarchy[parent_key] = []
hierarchy[parent_key].append(coll["key"])
# Function to recursively format collections
def format_collection(key, level=0):
if key not in collection_map:
return []
coll = collection_map[key]
name = coll["data"].get("name", "Unnamed Collection")
# Create indentation for hierarchy
indent = " " * level
lines = [f"{indent}- **{name}** (Key: {key})"]
# Add children if they exist
child_keys = hierarchy.get(key, [])
for child_key in sorted(child_keys): # Sort for consistent output
lines.extend(format_collection(child_key, level + 1))
return lines
# Start with top-level collections (those with None as parent)
top_level_keys = hierarchy.get(None, [])
if not top_level_keys:
# If no clear hierarchy, just list all collections
output.append("Collections (flat list):")
for coll in sorted(collections, key=lambda x: x["data"].get("name", "")):
name = coll["data"].get("name", "Unnamed Collection")
key = coll["key"]
output.append(f"- **{name}** (Key: {key})")
else:
# Display hierarchical structure
for key in sorted(top_level_keys):
output.extend(format_collection(key))
return "\n".join(output)
except Exception as e:
ctx.error(f"Error fetching collections: {str(e)}")
error_msg = f"Error fetching collections: {str(e)}"
return f"# Zotero Collections\n\n{error_msg}"
@mcp.tool(
name="zotero_get_collection_items",
description="Get all items in a specific Zotero collection."
)
def get_collection_items(
collection_key: str,
limit: Optional[int] = 50,
*,
ctx: Context
) -> str:
"""
Get all items in a specific Zotero collection.
Args:
collection_key: The collection key/ID
limit: Maximum number of items to return
ctx: MCP context
Returns:
Markdown-formatted list of items in the collection
"""
try:
ctx.info(f"Fetching items for collection {collection_key}")
zot = get_zotero_client()
# First get the collection details
try:
collection = zot.collection(collection_key)
collection_name = collection["data"].get("name", "Unnamed Collection")
except Exception:
collection_name = f"Collection {collection_key}"
# Then get the items
items = zot.collection_items(collection_key, limit=limit)
if not items:
return f"No items found in collection: {collection_name} (Key: {collection_key})"
# Format items as markdown
output = [f"# Items in Collection: {collection_name}", ""]
for i, item in enumerate(items, 1):
data = item.get("data", {})
title = data.get("title", "Untitled")
item_type = data.get("itemType", "unknown")
date = data.get("date", "No date")
key = item.get("key", "")
# Format creators
creators = data.get("creators", [])
creators_str = format_creators(creators)
# Build the formatted entry
output.append(f"## {i}. {title}")
output.append(f"**Type:** {item_type}")
output.append(f"**Item Key:** {key}")
output.append(f"**Date:** {date}")
output.append(f"**Authors:** {creators_str}")
output.append("") # Empty line between items
return "\n".join(output)
except Exception as e:
ctx.error(f"Error fetching collection items: {str(e)}")
return f"Error fetching collection items: {str(e)}"
@mcp.tool(
name="zotero_get_item_children",
description="Get all child items (attachments, notes) for a specific Zotero item."
)
def get_item_children(
item_key: str,
*,
ctx: Context
) -> str:
"""
Get all child items (attachments, notes) for a specific Zotero item.
Args:
item_key: Zotero item key/ID
ctx: MCP context
Returns:
Markdown-formatted list of child items
"""
try:
ctx.info(f"Fetching children for item {item_key}")
zot = get_zotero_client()
# First get the parent item details
try:
parent = zot.item(item_key)
parent_title = parent["data"].get("title", "Untitled Item")
except Exception:
parent_title = f"Item {item_key}"
# Then get the children
children = zot.children(item_key)
if not children:
return f"No child items found for: {parent_title} (Key: {item_key})"
# Format children as markdown
output = [f"# Child Items for: {parent_title}", ""]
# Group children by type
attachments = []
notes = []
others = []
for child in children:
data = child.get("data", {})
item_type = data.get("itemType", "unknown")
if item_type == "attachment":
attachments.append(child)
elif item_type == "note":
notes.append(child)
else:
others.append(child)
# Format attachments
if attachments:
output.append("## Attachments")
for i, att in enumerate(attachments, 1):
data = att.get("data", {})
title = data.get("title", "Untitled")
key = att.get("key", "")
content_type = data.get("contentType", "Unknown")
filename = data.get("filename", "")
output.append(f"{i}. **{title}**")
output.append(f" - Key: {key}")
output.append(f" - Type: {content_type}")
if filename:
output.append(f" - Filename: {filename}")
output.append("")
# Format notes
if notes:
output.append("## Notes")
for i, note in enumerate(notes, 1):
data = note.get("data", {})
title = data.get("title", "Untitled Note")
key = note.get("key", "")
note_text = data.get("note", "")
# Clean up HTML in notes
note_text = note_text.replace("<p>", "").replace("</p>", "\n\n")
note_text = note_text.replace("<br/>", "\n").replace("<br>", "\n")
# Limit note length for display
if len(note_text) > 500:
note_text = note_text[:500] + "...\n\n(Note truncated)"
output.append(f"{i}. **{title}**")
output.append(f" - Key: {key}")
output.append(f" - Content:\n```\n{note_text}\n```")
output.append("")
# Format other item types
if others:
output.append("## Other Items")
for i, other in enumerate(others, 1):
data = other.get("data", {})
title = data.get("title", "Untitled")
key = other.get("key", "")
item_type = data.get("itemType", "unknown")
output.append(f"{i}. **{title}**")
output.append(f" - Key: {key}")
output.append(f" - Type: {item_type}")
output.append("")
return "\n".join(output)
except Exception as e:
ctx.error(f"Error fetching item children: {str(e)}")
return f"Error fetching item children: {str(e)}"
@mcp.tool(
name="zotero_get_tags",
description="Get all tags used in your Zotero library."
)
def get_tags(
limit: Optional[int] = None,
*,
ctx: Context
) -> str:
"""
Get all tags used in your Zotero library.
Args:
limit: Maximum number of tags to return
ctx: MCP context
Returns:
Markdown-formatted list of tags
"""
try:
ctx.info("Fetching tags")
zot = get_zotero_client()
tags = zot.tags(limit=limit)
if not tags:
return "No tags found in your Zotero library."
# Format tags as markdown
output = ["# Zotero Tags", ""]
# Sort tags alphabetically
sorted_tags = sorted(tags)
# Group tags alphabetically
current_letter = None
for tag in sorted_tags:
first_letter = tag[0].upper() if tag else "#"
if first_letter != current_letter:
current_letter = first_letter
output.append(f"## {current_letter}")
output.append(f"- `{tag}`")
return "\n".join(output)
except Exception as e:
ctx.error(f"Error fetching tags: {str(e)}")
return f"Error fetching tags: {str(e)}"
@mcp.tool(
name="zotero_get_recent",
description="Get recently added items to your Zotero library."
)
def get_recent(
limit: int = 10,
*,
ctx: Context
) -> str:
"""
Get recently added items to your Zotero library.
Args:
limit: Number of items to return
ctx: MCP context
Returns:
Markdown-formatted list of recent items
"""
try:
ctx.info(f"Fetching {limit} recent items")
zot = get_zotero_client()
# Ensure limit is a reasonable number
if limit <= 0:
limit = 10
elif limit > 100:
limit = 100
# Get recent items
items = zot.items(limit=limit, sort="dateAdded", direction="desc")
if not items:
return "No items found in your Zotero library."
# Format items as markdown
output = [f"# {limit} Most Recently Added Items", ""]
for i, item in enumerate(items, 1):
data = item.get("data", {})
title = data.get("title", "Untitled")
item_type = data.get("itemType", "unknown")
date = data.get("date", "No date")
key = item.get("key", "")
date_added = data.get("dateAdded", "Unknown")
# Format creators
creators = data.get("creators", [])
creators_str = format_creators(creators)
# Build the formatted entry
output.append(f"## {i}. {title}")
output.append(f"**Type:** {item_type}")
output.append(f"**Item Key:** {key}")
output.append(f"**Date:** {date}")
output.append(f"**Added:** {date_added}")
output.append(f"**Authors:** {creators_str}")
output.append("") # Empty line between items
return "\n".join(output)
except Exception as e:
ctx.error(f"Error fetching recent items: {str(e)}")
return f"Error fetching recent items: {str(e)}"
@mcp.tool(
name="zotero_batch_update_tags",
description="Batch update tags across multiple items matching a search query."
)
def batch_update_tags(
query: str,
add_tags: Optional[List[str]] = None,
remove_tags: Optional[List[str]] = None,
limit: int = 50,
*,
ctx: Context
) -> str:
"""
Batch update tags across multiple items matching a search query.
Args:
query: Search query to find items to update
add_tags: List of tags to add to matched items
remove_tags: List of tags to remove from matched items
limit: Maximum number of items to process
ctx: MCP context
Returns:
Summary of the batch update
"""
try:
if not query:
return "Error: Search query cannot be empty"
if not add_tags and not remove_tags:
return "Error: You must specify either tags to add or tags to remove"
ctx.info(f"Batch updating tags for items matching '{query}'")
zot = get_zotero_client()
# Search for items matching the query
zot.add_parameters(q=query, limit=limit)
items = zot.items()
if not items:
return f"No items found matching query: '{query}'"
# Initialize counters
updated_count = 0
skipped_count = 0
added_tag_counts = {tag: 0 for tag in (add_tags or [])}
removed_tag_counts = {tag: 0 for tag in (remove_tags or [])}
# Process each item
for item in items:
# Skip attachments if they were included in the results
if item["data"].get("itemType") == "attachment":
skipped_count += 1
continue
# Get current tags
current_tags = item["data"].get("tags", [])
current_tag_values = {t["tag"] for t in current_tags}
# Track if this item needs to be updated
needs_update = False
# Process tags to remove
if remove_tags:
new_tags = []
for tag_obj in current_tags:
tag = tag_obj["tag"]
if tag in remove_tags:
removed_tag_counts[tag] += 1
needs_update = True
else:
new_tags.append(tag_obj)
current_tags = new_tags
# Process tags to add
if add_tags:
for tag in add_tags:
if tag and tag not in current_tag_values:
current_tags.append({"tag": tag})
added_tag_counts[tag] += 1
needs_update = True
# Update the item if needed
if needs_update:
item["data"]["tags"] = current_tags
zot.update_item(item)
updated_count += 1
else:
skipped_count += 1
# Format the response
response = ["# Batch Tag Update Results", ""]
response.append(f"Query: '{query}'")
response.append(f"Items processed: {len(items)}")
response.append(f"Items updated: {updated_count}")
response.append(f"Items skipped: {skipped_count}")
if add_tags:
response.append("\n## Tags Added")
for tag, count in added_tag_counts.items():
response.append(f"- `{tag}`: {count} items")
if remove_tags:
response.append("\n## Tags Removed")
for tag, count in removed_tag_counts.items():
response.append(f"- `{tag}`: {count} items")
return "\n".join(response)
except Exception as e:
ctx.error(f"Error in batch tag update: {str(e)}")
return f"Error in batch tag update: {str(e)}"
@mcp.tool(
name="zotero_advanced_search",
description="Perform an advanced search with multiple criteria."
)
def advanced_search(
conditions: List[Dict[str, str]],
join_mode: Literal["all", "any"] = "all",
sort_by: Optional[str] = None,
sort_direction: Literal["asc", "desc"] = "asc",
limit: int = 50,
*,
ctx: Context
) -> str:
"""
Perform an advanced search with multiple criteria.
Args:
conditions: List of search condition dictionaries, each containing:
- field: The field to search (title, creator, date, tag, etc.)
- operation: The operation to perform (is, isNot, contains, etc.)
- value: The value to search for
join_mode: Whether all conditions must match ("all") or any condition can match ("any")
sort_by: Field to sort by (dateAdded, dateModified, title, creator, etc.)
sort_direction: Direction to sort (asc or desc)
limit: Maximum number of results to return
ctx: MCP context
Returns:
Markdown-formatted search results
"""
try:
if not conditions:
return "Error: No search conditions provided"
ctx.info(f"Performing advanced search with {len(conditions)} conditions")
zot = get_zotero_client()
# Prepare search parameters
params = {}
# Add sorting parameters if specified
if sort_by:
params["sort"] = sort_by
params["direction"] = sort_direction
# Add limit parameter
params["limit"] = limit
# Build search conditions
search_conditions = []
for i, condition in enumerate(conditions):
if "field" not in condition or "operation" not in condition or "value" not in condition:
return f"Error: Condition {i+1} is missing required fields (field, operation, value)"
# Map common field names to Zotero API fields if needed
field = condition["field"]
operation = condition["operation"]
value = condition["value"]
# Handle special fields
if field == "author" or field == "creator":
field = "creator"
elif field == "year":
field = "date"
# Convert year to partial date format for matching
value = str(value)
search_conditions.append({
"condition": field,
"operator": operation,
"value": value
})
# Add join mode condition
search_conditions.append({
"condition": "joinMode",
"operator": join_mode,
"value": ""
})
# Create a saved search
search_name = f"temp_search_{uuid.uuid4().hex[:8]}"
saved_search = zot.saved_search(
search_name,
search_conditions
)
# Extract the search key from the result
if not saved_search.get("success"):
return f"Error creating saved search: {saved_search.get('failed', 'Unknown error')}"
search_key = next(iter(saved_search.get("success", {}).values()), None)
# Execute the saved search
try:
results = zot.collection_items(search_key)
finally:
# Clean up the temporary saved search
try:
zot.delete_saved_search([search_key])
except Exception as cleanup_error:
ctx.warn(f"Error cleaning up saved search: {str(cleanup_error)}")
# Format the results
if not results:
return "No items found matching the search criteria."
output = ["# Advanced Search Results", ""]
output.append(f"Found {len(results)} items matching the search criteria:")
output.append("")
# Add search criteria summary
output.append("## Search Criteria")
output.append(f"Join mode: {join_mode.upper()}")
for i, condition in enumerate(conditions, 1):
output.append(f"{i}. {condition['field']} {condition['operation']} \"{condition['value']}\"")
output.append("")
# Format results
output.append("## Results")
for i, item in enumerate(results, 1):
data = item.get("data", {})
title = data.get("title", "Untitled")
item_type = data.get("itemType", "unknown")
date = data.get("date", "No date")
key = item.get("key", "")
# Format creators
creators = data.get("creators", [])
creators_str = format_creators(creators)
# Build the formatted entry
output.append(f"### {i}. {title}")
output.append(f"**Type:** {item_type}")
output.append(f"**Item Key:** {key}")
output.append(f"**Date:** {date}")
output.append(f"**Authors:** {creators_str}")
# Add abstract snippet if present
if abstract := data.get("abstractNote"):
# Limit abstract length for search results
abstract_snippet = abstract[:150] + "..." if len(abstract) > 150 else abstract
output.append(f"**Abstract:** {abstract_snippet}")
# Add tags if present
if tags := data.get("tags"):
tag_list = [f"`{tag['tag']}`" for tag in tags]
if tag_list:
output.append(f"**Tags:** {' '.join(tag_list)}")
output.append("") # Empty line between items
return "\n".join(output)
except Exception as e:
ctx.error(f"Error in advanced search: {str(e)}")
return f"Error in advanced search: {str(e)}"
@mcp.tool(
name="zotero_get_annotations",
description="Get all annotations for a specific item or across your entire Zotero library."
)
def get_annotations(
item_key: Optional[str] = None,
use_pdf_extraction: bool = False,
limit: Optional[int] = None,
*,
ctx: Context
) -> str:
"""
Get annotations from your Zotero library.
Args:
item_key: Optional Zotero item key/ID to filter annotations by parent item
use_pdf_extraction: Whether to attempt direct PDF extraction as a fallback
limit: Maximum number of annotations to return
ctx: MCP context
Returns:
Markdown-formatted list of annotations
"""
try:
# Initialize Zotero client
zot = get_zotero_client()
# Prepare annotations list
annotations = []
parent_title = "Untitled Item"
# If an item key is provided, use specialized retrieval
if item_key:
# First, verify the item exists and get its details
try:
parent = zot.item(item_key)
parent_title = parent["data"].get("title", "Untitled Item")
ctx.info(f"Fetching annotations for item: {parent_title}")
except Exception:
return f"Error: No item found with key: {item_key}"
# Initialize annotation sources
better_bibtex_annotations = []
zotero_api_annotations = []
pdf_annotations = []
# Try Better BibTeX method (local Zotero only)
if os.environ.get("ZOTERO_LOCAL", "").lower() in ["true", "yes", "1"]:
try:
# Import Better BibTeX dependencies
from zotero_mcp.better_bibtex_client import (
ZoteroBetterBibTexAPI,
process_annotation,
get_color_category
)
# Initialize Better BibTeX client
bibtex = ZoteroBetterBibTexAPI()
# Check if Zotero with Better BibTeX is running
if bibtex.is_zotero_running():
# Extract citation key
citation_key = None
# Try to find citation key in Extra field
try:
extra_field = parent["data"].get("extra", "")
for line in extra_field.split("\n"):
if line.lower().startswith("citation key:"):
citation_key = line.replace("Citation Key:", "").strip()
break
elif line.lower().startswith("citationkey:"):
citation_key = line.replace("citationkey:", "").strip()
break
except Exception as e:
ctx.warn(f"Error extracting citation key from Extra field: {e}")
# Fallback to searching by title if no citation key found
if not citation_key:
title = parent["data"].get("title", "")
try:
if title:
# Use the search_citekeys method
search_results = bibtex.search_citekeys(title)
# Find the matching item
for result in search_results:
ctx.info(f"Checking result: {result}")
# Try to match with item key if possible
if result.get('citekey'):
citation_key = result['citekey']
break
except Exception as e:
ctx.warn(f"Error searching for citation key: {e}")
# Process annotations if citation key found
if citation_key:
try:
# Determine library ID
library_id = 1 # Default to personal library
search_results = bibtex._make_request("item.search", [citation_key])
if search_results:
matched_item = next((item for item in search_results if item.get('citekey') == citation_key), None)
if matched_item:
library_id = matched_item.get('libraryID', 1)
# Get attachments
attachments = bibtex.get_attachments(citation_key, library_id)
# Process annotations from attachments
for attachment in attachments:
annotations = bibtex.get_annotations_from_attachment(attachment)
for anno in annotations:
processed = process_annotation(anno, attachment)
if processed:
# Create Zotero-like annotation object
bibtex_anno = {
"key": processed.get("id", ""),
"data": {
"itemType": "annotation",
"annotationType": processed.get("type", "highlight"),
"annotationText": processed.get("annotatedText", ""),
"annotationComment": processed.get("comment", ""),
"annotationColor": processed.get("color", ""),
"parentItem": item_key,
"tags": [],
"_pdf_page": processed.get("page", 0),
"_pageLabel": processed.get("pageLabel", ""),
"_attachment_title": attachment.get("title", ""),
"_color_category": get_color_category(processed.get("color", "")),
"_from_better_bibtex": True
}
}
better_bibtex_annotations.append(bibtex_anno)
ctx.info(f"Retrieved {len(better_bibtex_annotations)} annotations via Better BibTeX")
except Exception as e:
ctx.warn(f"Error processing Better BibTeX annotations: {e}")
except Exception as bibtex_error:
ctx.warn(f"Error initializing Better BibTeX: {bibtex_error}")
# Fallback to Zotero API annotations
if not better_bibtex_annotations:
try:
# Get child annotations via Zotero API
children = zot.children(item_key)
zotero_api_annotations = [
item for item in children
if item.get("data", {}).get("itemType") == "annotation"
]
ctx.info(f"Retrieved {len(zotero_api_annotations)} annotations via Zotero API")
except Exception as api_error:
ctx.warn(f"Error retrieving Zotero API annotations: {api_error}")
# PDF Extraction fallback
if use_pdf_extraction and not (better_bibtex_annotations or zotero_api_annotations):
try:
from zotero_mcp.pdfannots_helper import extract_annotations_from_pdf, ensure_pdfannots_installed
import tempfile
import uuid
# Ensure PDF annotation tool is installed
if ensure_pdfannots_installed():
# Get PDF attachments
children = zot.children(item_key)
pdf_attachments = [
item for item in children
if item.get("data", {}).get("contentType") == "application/pdf"
]
# Extract annotations from PDFs
for attachment in pdf_attachments:
with tempfile.TemporaryDirectory() as tmpdir:
att_key = attachment.get("key", "")
file_path = os.path.join(tmpdir, f"{att_key}.pdf")
zot.dump(att_key, file_path)
if os.path.exists(file_path):
extracted = extract_annotations_from_pdf(file_path, tmpdir)
for ext in extracted:
# Skip empty annotations
if not ext.get("annotatedText") and not ext.get("comment"):
continue
# Create Zotero-like annotation object
pdf_anno = {
"key": f"pdf_{att_key}_{ext.get('id', uuid.uuid4().hex[:8])}",
"data": {
"itemType": "annotation",
"annotationType": ext.get("type", "highlight"),
"annotationText": ext.get("annotatedText", ""),
"annotationComment": ext.get("comment", ""),
"annotationColor": ext.get("color", ""),
"parentItem": item_key,
"tags": [],
"_pdf_page": ext.get("page", 0),
"_from_pdf_extraction": True,
"_attachment_title": attachment.get("data", {}).get("title", "PDF")
}
}
# Handle image annotations
if ext.get("type") == "image" and ext.get("imageRelativePath"):
pdf_anno["data"]["_image_path"] = os.path.join(tmpdir, ext.get("imageRelativePath"))
pdf_annotations.append(pdf_anno)
ctx.info(f"Retrieved {len(pdf_annotations)} annotations via PDF extraction")
except Exception as pdf_error:
ctx.warn(f"Error during PDF annotation extraction: {pdf_error}")
# Combine annotations from all sources
annotations = better_bibtex_annotations + zotero_api_annotations + pdf_annotations
else:
# Retrieve all annotations in the library
zot.add_parameters(itemType="annotation", limit=limit or 50)
annotations = zot.everything(zot.items())
# Handle no annotations found
if not annotations:
return f"No annotations found{f' for item: {parent_title}' if item_key else ''}."
# Generate markdown output
output = [f"# Annotations{f' for: {parent_title}' if item_key else ''}", ""]
for i, anno in enumerate(annotations, 1):
data = anno.get("data", {})
# Annotation details
anno_type = data.get("annotationType", "Unknown type")
anno_text = data.get("annotationText", "")
anno_comment = data.get("annotationComment", "")
anno_color = data.get("annotationColor", "")
anno_key = anno.get("key", "")
# Parent item context for library-wide retrieval
parent_info = ""
if not item_key and (parent_key := data.get("parentItem")):
try:
parent = zot.item(parent_key)
parent_title = parent["data"].get("title", "Untitled")
parent_info = f" (from \"{parent_title}\")"
except Exception:
parent_info = f" (parent key: {parent_key})"
# Annotation source details
source_info = ""
if data.get("_from_better_bibtex", False):
source_info = " (extracted via Better BibTeX)"
elif data.get("_from_pdf_extraction", False):
source_info = " (extracted directly from PDF)"
# Attachment context
attachment_info = ""
if "_attachment_title" in data and data["_attachment_title"]:
attachment_info = f" in {data['_attachment_title']}"
# Build markdown annotation entry
output.append(f"## Annotation {i}{parent_info}{attachment_info}{source_info}")
output.append(f"**Type:** {anno_type}")
output.append(f"**Key:** {anno_key}")
# Color information
if anno_color:
output.append(f"**Color:** {anno_color}")
if "_color_category" in data and data["_color_category"]:
output.append(f"**Color Category:** {data['_color_category']}")
# Page information
if "_pdf_page" in data:
label = data.get("_pageLabel", str(data["_pdf_page"]))
output.append(f"**Page:** {data['_pdf_page']} (Label: {label})")
# Annotation content
if anno_text:
output.append(f"**Text:** {anno_text}")
if anno_comment:
output.append(f"**Comment:** {anno_comment}")
# Image annotation
if "_image_path" in data and os.path.exists(data["_image_path"]):
output.append(f"**Image:** This annotation includes an image (not displayed in this interface)")
# Tags
if tags := data.get("tags"):
tag_list = [f"`{tag['tag']}`" for tag in tags]
if tag_list:
output.append(f"**Tags:** {' '.join(tag_list)}")
output.append("") # Empty line between annotations
return "\n".join(output)
except Exception as e:
ctx.error(f"Error fetching annotations: {str(e)}")
return f"Error fetching annotations: {str(e)}"
@mcp.tool(
name="zotero_get_notes",
description="Retrieve notes from your Zotero library, with options to filter by parent item."
)
def get_notes(
item_key: Optional[str] = None,
limit: Optional[int] = 20,
*,
ctx: Context
) -> str:
"""
Retrieve notes from your Zotero library.
Args:
item_key: Optional Zotero item key/ID to filter notes by parent item
limit: Maximum number of notes to return
ctx: MCP context
Returns:
Markdown-formatted list of notes
"""
try:
ctx.info(f"Fetching notes{f' for item {item_key}' if item_key else ''}")
zot = get_zotero_client()
# Prepare search parameters
params = {"itemType": "note"}
if item_key:
params["parentItem"] = item_key
# Get notes
notes = zot.items(**params) if not limit else zot.items(limit=limit, **params)
if not notes:
return f"No notes found{f' for item {item_key}' if item_key else ''}."
# Generate markdown output
output = [f"# Notes{f' for Item: {item_key}' if item_key else ''}", ""]
for i, note in enumerate(notes, 1):
data = note.get("data", {})
note_key = note.get("key", "")
# Parent item context
parent_info = ""
if parent_key := data.get("parentItem"):
try:
parent = zot.item(parent_key)
parent_title = parent["data"].get("title", "Untitled")
parent_info = f" (from \"{parent_title}\")"
except Exception:
parent_info = f" (parent key: {parent_key})"
# Prepare note text
note_text = data.get("note", "")
# Clean up HTML formatting
note_text = note_text.replace("<p>", "").replace("</p>", "\n\n")
note_text = note_text.replace("<br/>", "\n").replace("<br>", "\n")
# Limit note length for display
if len(note_text) > 500:
note_text = note_text[:500] + "..."
# Build markdown entry
output.append(f"## Note {i}{parent_info}")
output.append(f"**Key:** {note_key}")
# Tags
if tags := data.get("tags"):
tag_list = [f"`{tag['tag']}`" for tag in tags]
if tag_list:
output.append(f"**Tags:** {' '.join(tag_list)}")
output.append(f"**Content:**\n{note_text}")
output.append("") # Empty line between notes
return "\n".join(output)
except Exception as e:
ctx.error(f"Error fetching notes: {str(e)}")
return f"Error fetching notes: {str(e)}"
@mcp.tool(
name="zotero_search_notes",
description="Search for notes across your Zotero library."
)
def search_notes(
query: str,
limit: Optional[int] = 20,
*,
ctx: Context
) -> str:
"""
Search for notes in your Zotero library.
Args:
query: Search query string
limit: Maximum number of results to return
ctx: MCP context
Returns:
Markdown-formatted search results
"""
try:
if not query.strip():
return "Error: Search query cannot be empty"
ctx.info(f"Searching Zotero notes for '{query}'")
zot = get_zotero_client()
# Search for notes and annotations
results = []
# First search notes
zot.add_parameters(q=query, itemType="note", limit=limit or 20)
notes = zot.items()
# Then search annotations (reusing the get_annotations function)
annotation_results = get_annotations(
item_key=None, # Search all annotations
use_pdf_extraction=True,
limit=limit or 20,
ctx=ctx
)
# Parse the annotation results to extract annotation items
# This is a bit hacky and depends on the exact formatting of get_annotations
# You might want to modify get_annotations to return a more structured result
annotation_lines = annotation_results.split("\n")
current_annotation = None
annotations = []
for line in annotation_lines:
if line.startswith("## "):
if current_annotation:
annotations.append(current_annotation)
current_annotation = {"lines": [line], "type": "annotation"}
elif current_annotation is not None:
current_annotation["lines"].append(line)
if current_annotation:
annotations.append(current_annotation)
# Format results
output = [f"# Search Results for '{query}'", ""]
# Filter and highlight notes
query_lower = query.lower()
note_results = []
for note in notes:
data = note.get("data", {})
note_text = data.get("note", "").lower()
if query_lower in note_text:
# Prepare full note details
note_result = {
"type": "note",
"key": note.get("key", ""),
"data": data
}
note_results.append(note_result)
# Combine and sort results
all_results = note_results + annotations
for i, result in enumerate(all_results, 1):
if result["type"] == "note":
# Note formatting
data = result["data"]
key = result["key"]
# Parent item context
parent_info = ""
if parent_key := data.get("parentItem"):
try:
parent = zot.item(parent_key)
parent_title = parent["data"].get("title", "Untitled")
parent_info = f" (from \"{parent_title}\")"
except Exception:
parent_info = f" (parent key: {parent_key})"
# Note text with query highlight
note_text = data.get("note", "")
note_text = note_text.replace("<p>", "").replace("</p>", "\n\n")
note_text = note_text.replace("<br/>", "\n").replace("<br>", "\n")
# Highlight query in note text
try:
# Find first occurrence of query and extract context
text_lower = note_text.lower()
pos = text_lower.find(query_lower)
if pos >= 0:
# Extract context around the query
start = max(0, pos - 100)
end = min(len(note_text), pos + 200)
context = note_text[start:end]
# Highlight the query in the context
highlighted = context.replace(
context[context.lower().find(query_lower):context.lower().find(query_lower)+len(query)],
f"**{context[context.lower().find(query_lower):context.lower().find(query_lower)+len(query)]}**"
)
note_text = highlighted + "..."
except Exception:
# Fallback to first 500 characters if highlighting fails
note_text = note_text[:500] + "..."
output.append(f"## Note {i}{parent_info}")
output.append(f"**Key:** {key}")
# Tags
if tags := data.get("tags"):
tag_list = [f"`{tag['tag']}`" for tag in tags]
if tag_list:
output.append(f"**Tags:** {' '.join(tag_list)}")
output.append(f"**Content:**\n{note_text}")
output.append("")
elif result["type"] == "annotation":
# Add the entire annotation block
output.extend(result["lines"])
output.append("")
return "\n".join(output) if output else f"No results found for '{query}'"
except Exception as e:
ctx.error(f"Error searching notes: {str(e)}")
return f"Error searching notes: {str(e)}"
@mcp.tool(
name="zotero_create_note",
description="Create a new note for a Zotero item."
)
def create_note(
item_key: str,
note_title: str,
note_text: str,
tags: Optional[List[str]] = None,
*,
ctx: Context
) -> str:
"""
Create a new note for a Zotero item.
Args:
item_key: Zotero item key/ID to attach the note to
note_title: Title for the note
note_text: Content of the note (can include simple HTML formatting)
tags: List of tags to apply to the note
ctx: MCP context
Returns:
Confirmation message with the new note key
"""
try:
ctx.info(f"Creating note for item {item_key}")
zot = get_zotero_client()
# First verify the parent item exists
try:
parent = zot.item(item_key)
parent_title = parent["data"].get("title", "Untitled Item")
except Exception:
return f"Error: No item found with key: {item_key}"
# Format the note content with proper HTML
# If the note_text already has HTML, use it directly
if "<p>" in note_text or "<div>" in note_text:
html_content = note_text
else:
# Convert plain text to HTML paragraphs - avoiding f-strings with replacements
paragraphs = note_text.split("\n\n")
html_parts = []
for p in paragraphs:
# Replace newlines with <br/> tags
p_with_br = p.replace("\n", "<br/>")
html_parts.append("<p>" + p_with_br + "</p>")
html_content = "".join(html_parts)
# Prepare the note data
note_data = {
"itemType": "note",
"parentItem": item_key,
"note": html_content,
"tags": [{"tag": tag} for tag in (tags or [])]
}
# Create the note
result = zot.create_items([note_data])
# Check if creation was successful
if "success" in result and result["success"]:
successful = result["success"]
if len(successful) > 0:
note_key = next(iter(successful.keys()))
return f"Successfully created note for \"{parent_title}\"\n\nNote key: {note_key}"
else:
return f"Note creation response was successful but no key was returned: {result}"
else:
return f"Failed to create note: {result.get('failed', 'Unknown error')}"
except Exception as e:
ctx.error(f"Error creating note: {str(e)}")
return f"Error creating note: {str(e)}"