Smart Photo Journal MCP Server
by Siddhant-K-code
Verified
import logging
import re
import threading
from collections import Counter
from typing import List, Optional, Dict
import osxphotos
from datetime import datetime, timedelta, timezone
from mcp.server import Server
from mcp.server.models import InitializationOptions, ServerCapabilities
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from dateparser import parse
from osxphotos import QueryOptions
from thefuzz import fuzz
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("smart_photo_journal.log"),
logging.StreamHandler(),
],
)
class PhotosDBLoader:
def __init__(self):
self._db: Optional[osxphotos.PhotosDB] = None
self.start_loading()
def start_loading(self):
def load():
try:
self._db = osxphotos.PhotosDB()
logging.info("PhotosDB loaded successfully.")
except Exception as e:
logging.error(f"Failed to load PhotosDB: {e}")
raise
thread = threading.Thread(target=load)
thread.daemon = True
thread.start()
@property
def db(self) -> osxphotos.PhotosDB:
if self._db is None:
logging.warning("PhotosDB is still loading; access attempted.")
raise Exception("PhotosDB is still loading. Please try again later.")
return self._db
# Global PhotosDB loader instance
photos_loader = PhotosDBLoader()
# Initialize MCP server
server = Server("smart-photo-journal")
def fuzzy_match_search(text: str, search_term: str, threshold=60) -> bool:
"""Use fuzzy matching for more flexible text searching"""
if not text or not search_term:
return False
return fuzz.ratio(text.lower(), search_term.lower()) > threshold
def get_photo_details(photo: osxphotos.PhotoInfo) -> dict:
"""Enhanced photo details with reliable filename extraction"""
# Extract filename using multiple fallback methods
filename = (
getattr(photo, "original_filename", None)
or getattr(photo, "title", None)
or photo.path.split("/")[-1]
or photo.filename
)
return {
"filename": filename,
"date": photo.date.strftime("%Y-%m-%d %H:%M:%S"),
"location": photo.place.name if photo.place else "Unknown",
"path": photo.path,
"persons": photo.persons,
"labels": getattr(photo, "labels", []),
"keywords": getattr(photo, "keywords", []),
}
def get_photos_by_criteria(
photosdb, keyword=None, location=None, person=None, start_date=None, end_date=None
):
"""Enhanced photo search using QueryOptions"""
query_params = {
"photos": True,
"movies": False,
"incloud": True,
"ignore_case": True,
}
if keyword:
query_params["label"] = [keyword]
if start_date:
query_params["from_date"] = start_date
if end_date:
query_params["to_date"] = end_date
# Get initial results
photos = photosdb.query(QueryOptions(**query_params))
# Apply additional filters that aren't supported by QueryOptions
if location:
photos = [
p for p in photos if p.place and fuzzy_match_search(p.place.name, location)
]
if person:
photos = [
p
for p in photos
if p.persons
and any(fuzzy_match_search(p_name, person) for p_name in p.persons)
]
return photos
def parse_date_range(query: str):
"""Parse natural language date queries"""
parsed_date = parse(query, settings={"PREFER_DATES_FROM": "future"})
if not parsed_date:
raise ValueError("Could not parse date from query")
start_date = parsed_date.replace(day=1)
end_date = (start_date + timedelta(days=32)).replace(day=1) - timedelta(days=1)
return start_date, end_date
def format_photo_results(
photos: List[osxphotos.PhotoInfo], title: str
) -> List[TextContent]:
"""Standardized photo result formatting with filenames"""
if not photos:
return [TextContent(type="text", text=f"No {title.lower()} found")]
photos.sort(key=lambda x: x.date)
photo_details = []
for photo in photos:
details = get_photo_details(photo)
photo_details.append(
f"📷 File: {details['filename']}\n"
f" Date: {details['date']}\n"
f" Location: {details['location']}\n"
f" People: {', '.join(details['persons']) if details['persons'] else 'None'}\n"
f" Labels: {', '.join(details['labels']) if details['labels'] else 'None'}\n"
"---"
)
response = [f"{title} ({len(photos)} found):", "", *photo_details]
return [TextContent(type="text", text="\n".join(response))]
def parse_complex_query(query: str) -> dict:
"""Parse complex natural language queries with enhanced pattern matching"""
params = {}
# Handle both explicit and natural language patterns
# Explicit format: location:Udaipur person:papa
explicit_location = re.search(r"location:(\w+)", query)
explicit_person = re.search(r"person:(\w+)", query)
# Natural language: "photos of/from/in Udaipur with papa"
natural_location = re.search(r"(?:from|in|of)\s+([A-Za-z\s]+?)(?:\s+with|$)", query)
natural_person = re.search(r"with\s+([A-Za-z\s]+?)(?:\s+in|from|$)", query)
# Set location parameter
if explicit_location:
params["location"] = explicit_location.group(1)
elif natural_location:
params["location"] = natural_location.group(1).strip()
# Set person parameter
if explicit_person:
params["person"] = explicit_person.group(1)
elif natural_person:
params["person"] = natural_person.group(1).strip()
return params
@server.call_tool()
async def handle_time_analysis(self, query_input: dict) -> List[TextContent]:
"""Analyze photo patterns with filename display"""
photos = photos_loader.db.photos()
hour_distribution = Counter(photo.date.hour for photo in photos)
weekday_distribution = Counter(photo.date.strftime("%A") for photo in photos)
analysis = [
"📸 Photo Taking Patterns:",
f"Total Photos: {len(photos)}",
"",
"⏰ Hourly Distribution:",
]
# Add photo details for each time slot
for hour, count in sorted(hour_distribution.items()):
hour_photos = [p for p in photos if p.date.hour == hour]
analysis.append(f"\n{hour:02d}:00 - {count} photos:")
for photo in hour_photos[:3]: # Show first 3 photos per hour
details = get_photo_details(photo)
analysis.append(f" • {details['filename']} ({details['date']})")
return [TextContent(type="text", text="\n".join(analysis))]
@server.call_tool()
async def handle_complex_search(self, query_input: dict) -> List[TextContent]:
"""Handle complex multi-criteria photo searches"""
try:
query = query_input["query"]
params = parse_complex_query(query)
# Get photos matching all criteria
photos = get_photos_by_criteria(
photos_loader.db,
location=params.get("location"),
person=params.get("person"),
)
return format_photo_results(
photos,
f"Photos from {params.get('location', 'anywhere')} with {params.get('person', 'anyone')}",
)
except Exception as e:
logging.error(f"Error in complex search: {e}")
return [TextContent(type="text", text=f"Error processing search: {str(e)}")]
@server.call_tool()
async def handle_label_search(self, query_input: dict) -> List[TextContent]:
"""Enhanced label search with filename display"""
label = query_input["label"].lower()
photos = get_photos_by_criteria(photos_loader.db, keyword=label)
return format_photo_results(photos, f"Photos labeled as '{label}'")
@server.call_tool()
async def handle_people_search(self, query_input: dict) -> List[TextContent]:
"""Enhanced people search with filename display"""
person = query_input["person"].lower()
photos = get_photos_by_criteria(photos_loader.db, person=person)
return format_photo_results(photos, f"Photos with {person}")
def generate_photo_analysis(photos: List[osxphotos.PhotoInfo], title: str) -> str:
"""Generate comprehensive analysis with filenames"""
locations = Counter(p.place.name for p in photos if p.place)
people = Counter(person for p in photos for person in p.persons if p.persons)
months = Counter(p.date.strftime("%B %Y") for p in photos)
analysis = [
f"📸 {title} ({len(photos)} photos)",
"",
"📍 Top Locations:",
*[f" • {loc}: {count} photos" for loc, count in locations.most_common(5)],
"",
"👥 Featured People:",
*[
f" • {person}: {count} appearances"
for person, count in people.most_common(5)
],
"",
"📅 Timeline:",
*[f" • {month}: {count} photos" for month, count in months.most_common(5)],
"",
"🖼️ Photo Details:",
]
for photo in photos:
details = get_photo_details(photo)
analysis.append(
f"\nFile: {details['filename']}\n"
f"Date: {details['date']}\n"
f"Location: {details['location']}\n"
f"People: {', '.join(details['persons']) if details['persons'] else 'None'}"
)
return "\n".join(analysis)
@server.call_tool()
async def handle_location_search(self, query_input: dict) -> List[TextContent]:
"""Location-based photo search with original filenames"""
try:
location = query_input["location"].lower()
photos = photos_loader.db.photos()
# Filter photos by location
matching_photos = [
photo
for photo in photos
if photo.place and location in photo.place.name.lower()
]
if matching_photos:
# Sort by date for chronological order
matching_photos.sort(key=lambda x: x.date)
# Build response focusing on filenames
filenames = []
for photo in matching_photos:
details = get_photo_details(photo)
filenames.append(
f"📷 {details['filename']}\n"
f" Taken: {details['date']}\n"
f" At: {details['location']}\n"
"---"
)
response = [
f"Found {len(matching_photos)} photos from {query_input['location']}:",
"",
*filenames,
]
return [TextContent(type="text", text="\n".join(response))]
else:
return [
TextContent(
type="text", text=f"No photos found from {query_input['location']}"
)
]
except Exception as e:
logging.error(f"Error in location search: {e}")
return [
TextContent(type="text", text=f"Error searching for location: {str(e)}")
]
# List available tools
@server.list_tools()
async def handle_list_tools() -> List[Tool]:
return [
Tool(
name="location-search",
description="Find photos from specific locations",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "Location name to search for",
}
},
"required": ["location"],
},
),
Tool(
name="label-search",
description="Search photos by labels or keywords (e.g., Birthday, Beach, Dogs)",
inputSchema={
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "Label or keyword to search for",
}
},
"required": ["label"],
},
),
Tool(
name="people-search",
description="Find photos containing specific people",
inputSchema={
"type": "object",
"properties": {
"person": {
"type": "string",
"description": "Name of person to search for",
}
},
"required": ["person"],
},
),
# TODO: Fix complex search tool, currently disabled; Either better regex or NLP
# Tool(
# name="complex-search",
# description="Search photos with multiple criteria (location, people, dates, labels)",
# inputSchema={
# "type": "object",
# "properties": {
# "query": {
# "type": "string",
# "description": "Natural language query combining multiple search criteria",
# }
# },
# "required": ["query"],
# },
# ),
]
# Main server loop
async def main():
"""Main function to start the MCP server."""
try:
logging.info("Starting Smart Photo Journal MCP server.")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="Smart Photo Journal",
server_version="1.0",
capabilities=ServerCapabilities(
experimental_capabilities={},
),
),
)
except Exception as e:
logging.critical(f"Critical error in MCP server: {e}")
raise
if __name__ == "__main__":
import asyncio
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Server shutdown requested by user.")
except Exception as e:
logging.critical(f"Unhandled exception during server runtime: {e}")