"""Zoho Timeline MCP Server - Interactive screenshot capture and timeline parsing."""
import os
import json
import asyncio
from pathlib import Path
from typing import Any, Sequence
from mcp.server import Server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
from mcp.server.stdio import stdio_server
from .screenshot import ScreenshotManager
from .parser import TimelineParser
class ZohoTimelineMCPServer:
"""MCP Server for Zoho timeline screenshot capture and parsing."""
def __init__(self):
self.server = Server("zoho-timeline-mcp")
self.screenshot_manager = ScreenshotManager()
self.parser = None # Initialized when needed with API key
self.setup_handlers()
def setup_handlers(self):
"""Setup MCP protocol handlers."""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="capture_screenshot",
description=(
"Capture a screenshot of the active window. "
"Click on the Zoho CRM timeline window first, then call this tool. "
"Call multiple times to capture multiple screenshots. "
"Returns screenshot ID and preview."
),
inputSchema={
"type": "object",
"properties": {
"capture_type": {
"type": "string",
"enum": ["active_window", "full_screen"],
"default": "active_window",
"description": "Type of screenshot to capture"
}
}
}
),
Tool(
name="list_screenshots",
description="List all captured screenshots with their IDs and timestamps.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="process_timeline",
description=(
"Process all captured screenshots to extract timeline data. "
"Call this after capturing all necessary screenshots. "
"Returns complete timeline JSON with workflows and functions."
),
inputSchema={
"type": "object",
"properties": {
"api_key": {
"type": "string",
"description": "Anthropic API key (optional, uses ANTHROPIC_API_KEY env var if not provided)"
}
}
}
),
Tool(
name="clear_screenshots",
description="Clear all captured screenshots and start fresh.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="export_results",
description="Export the most recent timeline analysis to files (JSON and Markdown).",
inputSchema={
"type": "object",
"properties": {
"output_dir": {
"type": "string",
"description": "Directory to save output files (default: user's Documents folder)"
}
}
}
),
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool calls."""
if name == "capture_screenshot":
capture_type = arguments.get("capture_type", "active_window")
try:
if capture_type == "active_window":
screenshot_id, filepath = self.screenshot_manager.capture_active_window()
else:
screenshot_id, filepath = self.screenshot_manager.capture_screenshot()
count = self.screenshot_manager.get_screenshot_count()
return [
TextContent(
type="text",
text=f"✓ Screenshot captured: {screenshot_id}\n"
f"Total screenshots: {count}\n\n"
f"Ready to capture another screenshot, or call 'process_timeline' when done."
)
]
except Exception as e:
return [
TextContent(
type="text",
text=f"Error capturing screenshot: {str(e)}\n\n"
f"Make sure the Zoho CRM window is active and visible."
)
]
elif name == "list_screenshots":
screenshots = self.screenshot_manager.get_all_screenshots()
if not screenshots:
return [
TextContent(
type="text",
text="No screenshots captured yet. Use 'capture_screenshot' to start."
)
]
output = f"Captured {len(screenshots)} screenshot(s):\n\n"
for ss in screenshots:
output += f"- {ss['id']}: {ss['timestamp']}\n"
return [
TextContent(
type="text",
text=output
)
]
elif name == "process_timeline":
screenshots = self.screenshot_manager.get_all_screenshots()
if not screenshots:
return [
TextContent(
type="text",
text="No screenshots to process. Capture some screenshots first using 'capture_screenshot'."
)
]
api_key = arguments.get("api_key")
try:
# Initialize parser
self.parser = TimelineParser(api_key=api_key)
# Extract image paths
image_paths = [Path(ss["path"]) for ss in screenshots]
# Process timeline
timeline_data = self.parser.extract_timeline_from_images(image_paths)
# Extract workflows and functions
workflows_functions = self.parser.extract_workflows_and_functions(timeline_data)
# Generate report
report = self.parser.generate_report(timeline_data)
# Store results for export
self._last_results = {
"timeline": timeline_data,
"workflows_functions": workflows_functions,
"report": report
}
# Return combined results
return [
TextContent(
type="text",
text=f"✓ Timeline processed successfully!\n\n{report}\n\n"
f"Full JSON data:\n```json\n{json.dumps(timeline_data, indent=2)}\n```\n\n"
f"Use 'export_results' to save to files."
)
]
except Exception as e:
return [
TextContent(
type="text",
text=f"Error processing timeline: {str(e)}\n\n"
f"Make sure ANTHROPIC_API_KEY is set in your environment."
)
]
elif name == "clear_screenshots":
self.screenshot_manager.clear_screenshots()
return [
TextContent(
type="text",
text="✓ All screenshots cleared. Ready to start fresh."
)
]
elif name == "export_results":
if not hasattr(self, '_last_results'):
return [
TextContent(
type="text",
text="No results to export. Run 'process_timeline' first."
)
]
output_dir = arguments.get("output_dir")
if not output_dir:
output_dir = Path.home() / "Documents" / "ZohoTimelineAnalysis"
else:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Save files
timeline_path = output_dir / "timeline_data.json"
workflows_path = output_dir / "workflows_and_functions.json"
report_path = output_dir / "timeline_report.md"
with open(timeline_path, "w") as f:
json.dump(self._last_results["timeline"], f, indent=2)
with open(workflows_path, "w") as f:
json.dump(self._last_results["workflows_functions"], f, indent=2)
with open(report_path, "w") as f:
f.write(self._last_results["report"])
return [
TextContent(
type="text",
text=f"✓ Results exported to:\n"
f"- {timeline_path}\n"
f"- {workflows_path}\n"
f"- {report_path}"
)
]
else:
return [
TextContent(
type="text",
text=f"Unknown tool: {name}"
)
]
async def run(self):
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
def main():
"""Main entry point."""
server = ZohoTimelineMCPServer()
asyncio.run(server.run())
if __name__ == "__main__":
main()