Skip to main content
Glama
rossshannon

Pinboard MCP Server

by rossshannon
client.py16.5 kB
"""Pinboard API client wrapper with caching and rate limiting.""" import asyncio import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Any, Optional import pinboard # type: ignore from cachetools import LRUCache from dateutil.parser import parse as parse_date from pinboard_mcp_server.models import Bookmark, TagCount class PinboardClient: """Pinboard API client with caching and rate limiting.""" def __init__(self, token: str): """Initialize the client with API token.""" self.token = token self.last_request_time = 0.0 self.min_request_interval = 3.0 # 3 seconds between requests # Initialize pinboard.py client self._pb = pinboard.Pinboard(token) # Cache for bookmarks and tags self._bookmark_cache: Optional[list[Bookmark]] = None self._tag_cache: Optional[list[TagCount]] = None self._last_update_time: Optional[datetime] = None self._cache_valid_until: Optional[datetime] = None self._has_expanded_data: bool = False # LRU cache for query results self._query_cache: LRUCache = LRUCache(maxsize=1000) # Thread pool for running sync pinboard.py calls self._executor = ThreadPoolExecutor(max_workers=1) def _convert_pinboard_bookmark(self, pb_bookmark) -> dict[str, Any]: """Convert pinboard.Bookmark object to dict format our models expect.""" # pinboard.Bookmark has: url, description, extended, tags (list), time (datetime) # We need: href, description, extended, tags (space-separated string), time (ISO string) tags_list = getattr(pb_bookmark, "tags", []) # Remove empty tags and join with spaces tags_str = " ".join([tag for tag in tags_list if tag.strip()]) time_obj = getattr(pb_bookmark, "time", None) if time_obj and hasattr(time_obj, "isoformat"): # Convert to UTC and format as Z-suffix (no double timezone conversion) time_str = time_obj.isoformat().replace("+00:00", "") + "Z" else: time_str = "2024-01-01T00:00:00Z" return { "href": pb_bookmark.url, "description": pb_bookmark.description, "extended": getattr(pb_bookmark, "extended", ""), "tags": tags_str, "time": time_str, } def _rate_limit_sync(self) -> None: """Ensure we don't exceed rate limits (synchronous).""" now = time.time() time_since_last = now - self.last_request_time if time_since_last < self.min_request_interval: time.sleep(self.min_request_interval - time_since_last) self.last_request_time = time.time() async def _run_in_executor(self, func, *args, **kwargs) -> Any: """Run a synchronous function in the thread pool.""" loop = asyncio.get_event_loop() return await loop.run_in_executor(self._executor, func, *args, **kwargs) async def _check_cache_validity(self) -> bool: """Check if cache is still valid using posts/update endpoint.""" if self._cache_valid_until and datetime.now() < self._cache_valid_until: return True try: def _get_update(): self._rate_limit_sync() return self._pb.posts.update() result = await self._run_in_executor(_get_update) last_update = parse_date(result["update_time"]) # Cache is valid if last update time hasn't changed if self._last_update_time and last_update <= self._last_update_time: # Extend cache validity for another hour self._cache_valid_until = datetime.now() + timedelta(hours=1) return True # Cache is invalid, update timestamp self._last_update_time = last_update return False except Exception: # If we can't check, assume cache is invalid return False async def _refresh_bookmark_cache(self, expand_search: bool = False) -> None: """Refresh the bookmark cache from Pinboard API. Args: expand_search: If True, get more bookmarks using posts.all() with date filter. If False, get only recent 100 bookmarks. """ def _get_posts() -> Any: self._rate_limit_sync() if expand_search: # Get bookmarks from the last 6 months - balance between comprehensive and reasonable # The LLM can intelligently select what's most relevant from_date = datetime.now() - timedelta(days=180) # 6 months return self._pb.posts.all( fromdt=from_date.strftime("%Y-%m-%dT%H:%M:%SZ") ) else: # Use posts.recent for initial cache - gets most recent 100 posts return self._pb.posts.recent(count=100) result: Any = await self._run_in_executor(_get_posts) # Handle both posts.recent() and posts.all() response formats if expand_search: # posts.all() returns a list directly posts_list = result if isinstance(result, list) else [] else: # posts.recent() returns a dict with 'posts' key posts_list = ( result["posts"] if isinstance(result, dict) and "posts" in result else [] ) self._bookmark_cache = [ Bookmark.from_pinboard(self._convert_pinboard_bookmark(post)) for post in posts_list ] self._cache_valid_until = datetime.now() + timedelta(hours=1) # Mark whether we have expanded data self._has_expanded_data = expand_search async def _search_by_tag_direct( self, tag: str, matches: list[Bookmark], from_date: Optional[datetime], to_date: Optional[datetime], limit: int, ) -> None: """Search for bookmarks by tag using Pinboard API directly. This is much more efficient than downloading all bookmarks. Modifies the matches list in-place. """ def _get_posts_by_tag() -> Any: self._rate_limit_sync() # Use posts.all with tag filter - much more efficient params = {"tag": tag} if from_date: params["fromdt"] = from_date.strftime("%Y-%m-%dT%H:%M:%SZ") if to_date: params["todt"] = to_date.strftime("%Y-%m-%dT%H:%M:%SZ") return self._pb.posts.all(**params) result: Any = await self._run_in_executor(_get_posts_by_tag) # posts.all() returns a list directly posts_list = result if isinstance(result, list) else [] # Convert and add to matches (up to limit) for post in posts_list: if len(matches) >= limit: break bookmark = Bookmark.from_pinboard(self._convert_pinboard_bookmark(post)) matches.append(bookmark) async def _refresh_tag_cache(self) -> None: """Refresh the tag cache from Pinboard API.""" def _get_tags() -> Any: self._rate_limit_sync() return self._pb.tags.get() result: Any = await self._run_in_executor(_get_tags) # The result is a list of Tag objects with .name and .count attributes self._tag_cache = [TagCount(tag=tag.name, count=tag.count) for tag in result] async def get_all_bookmarks(self, expand_if_needed: bool = False) -> list[Bookmark]: """Get bookmarks, using cache when possible. Args: expand_if_needed: If True, will expand search to get more bookmarks when initial cache is empty or insufficient. """ if not await self._check_cache_validity() or self._bookmark_cache is None: await self._refresh_bookmark_cache() # If we need expanded data and don't have it yet, fetch it if expand_if_needed and not self._has_expanded_data: await self._refresh_bookmark_cache(expand_search=True) return self._bookmark_cache or [] async def get_all_tags(self) -> list[TagCount]: """Get all tags with counts, using cache when possible.""" if self._tag_cache is None: await self._refresh_tag_cache() return self._tag_cache or [] async def search_bookmarks(self, query: str, limit: int = 20) -> list[Bookmark]: """Search bookmarks by query string.""" # Check query cache first cache_key = f"search:{query}:{limit}" if cache_key in self._query_cache: return self._query_cache[cache_key] # First try with recent bookmarks bookmarks = await self.get_all_bookmarks() query_lower = query.lower() # Search in title, notes, and tags matches = [] for bookmark in bookmarks: if ( query_lower in bookmark.title.lower() or query_lower in bookmark.notes.lower() or any(query_lower in tag.lower() for tag in bookmark.tags) ): matches.append(bookmark) if len(matches) >= limit: break # If no matches found, try optimized strategies before full expansion if not matches: # First, check if the query matches a tag exactly - use direct tag search tags = await self.get_all_tags() exact_tag_match = next( (tag.tag for tag in tags if tag.tag.lower() == query_lower), None ) if exact_tag_match: # Use efficient tag-based search try: await self._search_by_tag_direct( exact_tag_match, matches, None, None, limit ) except Exception: pass # Fall through to expanded search if tag search fails # If still no matches and we haven't expanded yet, try with more data # This allows comprehensive free-text search but limits scope for safety if not matches and not self._has_expanded_data: bookmarks = await self.get_all_bookmarks(expand_if_needed=True) for bookmark in bookmarks: if ( query_lower in bookmark.title.lower() or query_lower in bookmark.notes.lower() or any(query_lower in tag.lower() for tag in bookmark.tags) ): matches.append(bookmark) if len(matches) >= limit: break # Cache the result self._query_cache[cache_key] = matches return matches async def search_bookmarks_extended( self, query: str, days_back: int = 365, limit: int = 100 ) -> list[Bookmark]: """Extended search that looks further back in time for comprehensive results. Args: query: Search query to match against bookmark titles, notes, and tags days_back: How many days back to search (default 1 year) limit: Maximum number of results to return (default 100) Note: This provides generous data for LLM filtering and analysis. Returns comprehensive results that the client can intelligently filter. """ cache_key = f"extended_search:{query}:{days_back}:{limit}" if cache_key in self._query_cache: return self._query_cache[cache_key] query_lower = query.lower() # First check if this is an exact tag match - use efficient tag search tags = await self.get_all_tags() exact_tag_match = next( (tag.tag for tag in tags if tag.tag.lower() == query_lower), None ) matches: list[Bookmark] = [] if exact_tag_match: # Use efficient tag-based search for exact matches try: await self._search_by_tag_direct( exact_tag_match, matches, None, None, limit ) except Exception: pass # If no tag match or tag search failed, do extended time-based search if not matches: def _get_extended_posts() -> Any: self._rate_limit_sync() from_date = datetime.now() - timedelta(days=days_back) return self._pb.posts.all( fromdt=from_date.strftime("%Y-%m-%dT%H:%M:%SZ") ) result: Any = await self._run_in_executor(_get_extended_posts) posts_list = result if isinstance(result, list) else [] # Search through the extended results for post in posts_list: if len(matches) >= limit: break bookmark = Bookmark.from_pinboard(self._convert_pinboard_bookmark(post)) if ( query_lower in bookmark.title.lower() or query_lower in bookmark.notes.lower() or any(query_lower in tag.lower() for tag in bookmark.tags) ): matches.append(bookmark) # Sort by most recent first matches.sort(key=lambda b: b.saved_at, reverse=True) # Cache the result self._query_cache[cache_key] = matches return matches async def get_recent_bookmarks( self, days: int = 7, limit: int = 20 ) -> list[Bookmark]: """Get bookmarks from the last N days.""" cache_key = f"recent:{days}:{limit}" if cache_key in self._query_cache: return self._query_cache[cache_key] bookmarks = await self.get_all_bookmarks() cutoff_date = datetime.now() - timedelta(days=days) # Filter by date and sort by most recent first recent = [ bookmark for bookmark in bookmarks if bookmark.saved_at.replace(tzinfo=None) >= cutoff_date ] recent.sort(key=lambda b: b.saved_at, reverse=True) result = recent[:limit] self._query_cache[cache_key] = result return result async def get_bookmarks_by_tags( self, tags: list[str], from_date: Optional[datetime] = None, to_date: Optional[datetime] = None, limit: int = 20, ) -> list[Bookmark]: """Get bookmarks filtered by tags and optional date range.""" cache_key = f"tags:{':'.join(sorted(tags))}:{from_date}:{to_date}:{limit}" if cache_key in self._query_cache: return self._query_cache[cache_key] # First try with recent bookmarks bookmarks = await self.get_all_bookmarks() tag_set = {tag.lower() for tag in tags} matches = [] for bookmark in bookmarks: bookmark_tags = {tag.lower() for tag in bookmark.tags} # Check if all requested tags are present if tag_set.issubset(bookmark_tags): # Check date range if specified bookmark_date = bookmark.saved_at.replace(tzinfo=None) if from_date and bookmark_date < from_date: continue if to_date and bookmark_date > to_date: continue matches.append(bookmark) if len(matches) >= limit: break # If no matches found, try a targeted search by tag using Pinboard API # This is much more efficient than downloading all bookmarks and gets EVERYTHING if not matches and len(tags) == 1: # For single tag searches, use posts.all with tag filter - gets ALL bookmarks with that tag try: await self._search_by_tag_direct( tags[0], matches, from_date, to_date, limit ) except Exception: # If tag search fails, there's likely no bookmarks with that tag pass # matches will remain empty # Sort by most recent first matches.sort(key=lambda b: b.saved_at, reverse=True) self._query_cache[cache_key] = matches return matches async def close(self) -> None: """Close the client and clean up resources.""" self._executor.shutdown(wait=True)

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rossshannon/pinboard-bookmarks-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server