anybox_mcp_server.py•12.2 kB
#!/usr/bin/env python3
"""
Anybox MCP Server
Provides MCP tools for interacting with Anybox bookmark manager via HTTP API.
"""
import asyncio
import os
from typing import Any
from urllib.parse import urlencode
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Anybox API configuration
ANYBOX_BASE_URL = "http://127.0.0.1:6391"
API_KEY = os.getenv("ANYBOX_API_KEY", "")
app = Server("anybox-mcp-server")
class AnyboxAPI:
"""Wrapper for Anybox HTTP API"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = ANYBOX_BASE_URL
self.headers = {"x-api-key": api_key, "Content-Type": "application/json"}
async def search(
self,
query: str = "",
tag: str = "",
folder: str = "",
starred: str = "",
limit: int = 50,
) -> list[dict[str, Any]]:
"""Search for bookmarks"""
params = {}
if query:
params["q"] = query
if tag:
params["tag"] = tag
if folder:
params["folder"] = folder
if starred:
params["starred"] = starred
if limit:
params["limit"] = str(limit)
url = f"{self.base_url}/search"
if params:
url += "?" + urlencode(params)
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=self.headers, timeout=10.0)
response.raise_for_status()
return response.json()
async def get_tags(self) -> list[dict[str, Any]]:
"""Get all tags"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/tags", headers=self.headers, timeout=10.0
)
response.raise_for_status()
return response.json()
async def get_folders(self) -> list[dict[str, Any]]:
"""Get all folders"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/folders", headers=self.headers, timeout=10.0
)
response.raise_for_status()
return response.json()
async def save_link(
self,
url: str,
comment: str = "",
tags: list[str] = None,
folder: str = "",
starred: bool = False,
) -> dict[str, Any]:
"""Save a new link/note to Anybox"""
data = {
"note": url,
"comment": comment,
"tags": tags or [],
"starred": starred,
}
if folder:
data["folder"] = folder
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/save",
json=data,
headers=self.headers,
timeout=10.0,
)
response.raise_for_status()
return response.json()
async def update_document(
self, document_id: str, data: dict[str, Any]
) -> dict[str, Any]:
"""Update an existing document"""
async with httpx.AsyncClient() as client:
response = await client.put(
f"{self.base_url}/document/{document_id}",
json=data,
headers=self.headers,
timeout=10.0,
)
response.raise_for_status()
return response.json()
# Initialize API client
anybox = AnyboxAPI(API_KEY)
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available MCP tools"""
return [
Tool(
name="search_bookmarks",
description="Search for bookmarks in Anybox. Can search by keyword, filter by tag, folder, or starred status.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search keyword (optional)",
},
"tag_id": {
"type": "string",
"description": "Tag ID to filter by (optional, use list_tags to get IDs)",
},
"folder_id": {
"type": "string",
"description": "Folder ID to filter by (optional, use list_folders to get IDs)",
},
"starred": {
"type": "string",
"description": "Filter by starred status: 'yes' or 'no' (optional)",
"enum": ["yes", "no"],
},
"limit": {
"type": "number",
"description": "Maximum number of results (default: 50, max: 50)",
"default": 50,
},
},
},
),
Tool(
name="list_tags",
description="Get a list of all tags in Anybox with their IDs, names, colors, and bookmark counts.",
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(
name="list_folders",
description="Get a list of all folders in Anybox with their IDs, names, colors, and bookmark counts.",
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(
name="save_bookmark",
description="Save a new bookmark/link to Anybox with optional tags, folder, comment, and starred status.",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL or note content to save",
},
"comment": {
"type": "string",
"description": "Optional comment/note about the bookmark",
},
"tag_ids": {
"type": "array",
"items": {"type": "string"},
"description": "Array of tag IDs to apply (use list_tags to get IDs)",
},
"folder_id": {
"type": "string",
"description": "Folder ID to save to (use list_folders to get IDs)",
},
"starred": {
"type": "boolean",
"description": "Mark as starred (default: false)",
"default": False,
},
},
"required": ["url"],
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls"""
if name == "search_bookmarks":
query = arguments.get("query", "")
tag_id = arguments.get("tag_id", "")
folder_id = arguments.get("folder_id", "")
starred = arguments.get("starred", "")
limit = arguments.get("limit", 50)
try:
results = await anybox.search(query, tag_id, folder_id, starred, limit)
if not results:
return [
TextContent(
type="text",
text="No bookmarks found matching your search criteria.",
)
]
# Format results
output = f"Found {len(results)} bookmark(s):\n\n"
for i, link in enumerate(results, 1):
output += f"{i}. **{link.get('title', 'Untitled')}**\n"
output += f" URL: {link.get('url', 'N/A')}\n"
output += f" ID: {link.get('id', 'N/A')}\n"
if link.get("description"):
output += f" Description: {link['description']}\n"
if link.get("comment"):
output += f" Comment: {link['comment']}\n"
if link.get("tags"):
tag_names = [tag.get("name", "") for tag in link["tags"]]
output += f" Tags: {', '.join(tag_names)}\n"
if link.get("folder"):
output += f" Folder: {link['folder'].get('name', '')}\n"
if link.get("isStarred"):
output += " ⭐ Starred\n"
output += f" Added: {link.get('dateAdded', 'N/A')}\n"
output += "\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [
TextContent(
type="text",
text=f"Error searching bookmarks: {str(e)}\n\nMake sure Anybox is running and the API key is correct.",
)
]
elif name == "list_tags":
try:
tags = await anybox.get_tags()
if not tags:
return [TextContent(type="text", text="No tags found.")]
output = f"Found {len(tags)} tag(s):\n\n"
for tag in tags:
output += f"• **{tag.get('name', 'Untitled')}**\n"
output += f" ID: `{tag.get('id', 'N/A')}`\n"
output += f" Count: {tag.get('count', 0)} bookmark(s)\n"
if tag.get("color"):
output += f" Color: {tag['color']}\n"
output += "\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [
TextContent(
type="text",
text=f"Error fetching tags: {str(e)}\n\nMake sure Anybox is running and the API key is correct.",
)
]
elif name == "list_folders":
try:
folders = await anybox.get_folders()
if not folders:
return [TextContent(type="text", text="No folders found.")]
output = f"Found {len(folders)} folder(s):\n\n"
for folder in folders:
output += f"• **{folder.get('name', 'Untitled')}**\n"
output += f" ID: `{folder.get('id', 'N/A')}`\n"
output += f" Count: {folder.get('count', 0)} bookmark(s)\n"
if folder.get("color"):
output += f" Color: {folder['color']}\n"
output += "\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [
TextContent(
type="text",
text=f"Error fetching folders: {str(e)}\n\nMake sure Anybox is running and the API key is correct.",
)
]
elif name == "save_bookmark":
url = arguments.get("url")
comment = arguments.get("comment", "")
tag_ids = arguments.get("tag_ids", [])
folder_id = arguments.get("folder_id", "")
starred = arguments.get("starred", False)
try:
result = await anybox.save_link(url, comment, tag_ids, folder_id, starred)
output = "✅ Bookmark saved successfully!\n\n"
if result.get("url"):
output += f"Anybox URL: {result['url']}\n"
if result.get("message"):
output += f"Message: {result['message']}\n"
return [TextContent(type="text", text=output)]
except Exception as e:
return [
TextContent(
type="text",
text=f"Error saving bookmark: {str(e)}\n\nMake sure Anybox is running and the API key is correct.",
)
]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
"""Run the MCP server"""
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())