COMPLETE_IMPLEMENTATION_GUIDE.md•18.6 kB
# Complete Implementation Guide
## Enhanced MCP Server with LLM Fallback
This guide provides the complete code for all remaining files to create a production-grade MCP server.
---
## Table of Contents
1. [Core Adapters](#core-adapters)
2. [MCP Tools](#mcp-tools)
3. [Main MCP Server](#main-mcp-server)
4. [Workflows](#workflows)
5. [Schedulers](#schedulers)
6. [Utilities](#utilities)
7. [Setup Script](#setup-script)
8. [Documentation](#documentation)
---
## Core Adapters
### src/adapters/gmail_adapter.py
**Purpose**: Enhanced Gmail operations with LLM-powered categorization
**Key Features**:
- Search and filter emails with advanced queries
- Auto-categorize emails (work, personal, job applications, urgent)
- Extract email summaries using LLM
- Send emails with templates
- Detect calendar invites in emails
- Thread tracking
**Dependencies**:
```python
from googleapiclient.discovery import build
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import base64
from typing import List, Optional
from ..models.email_models import Email, EmailCategory, EmailFilter
from ..utils.google_auth import get_credentials
from ..utils.llm_manager import LLMManager
```
**Main Functions**:
```python
async def search_emails(filter: EmailFilter, llm_manager: LLMManager) -> List[Email]
async def get_email_detail(email_id: str) -> Email
async def send_email(draft: EmailDraft) -> dict
async def categorize_email(email: Email, llm_manager: LLMManager) -> EmailCategory
async def extract_calendar_invite(email: Email, llm_manager: LLMManager) -> Optional[dict]
async def generate_email_summary(emails: List[Email], llm_manager: LLMManager) -> str
```
**Implementation Pattern**:
```python
class GmailAdapter:
def __init__(self):
self.service = build('gmail', 'v1', credentials=get_credentials())
async def search_emails(self, filter: EmailFilter) -> List[Email]:
# Build Gmail query
query = self._build_query(filter)
# Execute search
results = self.service.users().messages().list(
userId='me',
q=query,
maxResults=filter.max_results
).execute()
# Process and return emails
emails = []
for msg in results.get('messages', []):
email = await self.get_email_detail(msg['id'])
emails.append(email)
return emails
async def categorize_email(self, email: Email, llm_manager: LLMManager):
prompt = f'''
Categorize this email into one of: work, personal, job_application, urgent, spam, newsletter, social, financial, other
From: {email.from_email}
Subject: {email.subject}
Body: {email.snippet[:500]}
'''
response = await llm_manager.generate(prompt)
# Parse and return category
return self._parse_category(response.content)
```
---
### src/adapters/calendar_adapter.py
**Purpose**: Enhanced Google Calendar operations with smart features
**Key Features**:
- Create/update/delete events
- Detect meeting conflicts
- Find free time slots
- Smart reminders (15min, 1hr, 1day before)
- Daily/weekly summaries
- Focus time blocking
**Main Functions**:
```python
async def create_event(event: CalendarEvent) -> dict
async def get_events(filter: CalendarEventFilter) -> List[CalendarEvent]
async def detect_conflicts(calendar_id: str, start: datetime, end: datetime) -> List[MeetingConflict]
async def find_free_slots(calendar_id: str, date: date, duration_minutes: int) -> List[dict]
async def create_focus_time(start: datetime, duration_minutes: int) -> dict
async def get_calendar_summary(calendar_id: str, days_ahead: int) -> CalendarSummary
```
---
### src/adapters/drive_adapter.py
**Purpose**: Google Drive integration for document search and retrieval
**Key Features**:
- Search documents across Drive
- Read document contents
- Summarize documents with LLM
- Attach documents to emails
- Support PDF, Docs, Sheets, text files
**Main Functions**:
```python
async def search_documents(query: str, file_types: List[str]) -> List[dict]
async def get_document_content(file_id: str) -> str
async def summarize_document(file_id: str, llm_manager: LLMManager) -> str
async def get_file_metadata(file_id: str) -> dict
```
---
### src/adapters/sheets_adapter.py
**Purpose**: Enhanced Google Sheets operations
**Key Features**:
- Read/write/append data
- Create tracking sheets (job applications, email analytics)
- Format cells and add formulas
- Batch operations
- Auto-create sheets from templates
**Main Functions**:
```python
async def read_sheet(spreadsheet_id: str, range: str) -> List[List[Any]]
async def write_sheet(spreadsheet_id: str, range: str, values: List[List[Any]]) -> dict
async def create_job_tracking_sheet() -> str
async def update_job_application(spreadsheet_id: str, application: JobApplication) -> dict
async def get_email_analytics(spreadsheet_id: str) -> dict
```
---
### src/adapters/keep_adapter.py
**Purpose**: Google Keep integration for notes and API key management
**Note**: Google Keep doesn't have an official API. We'll use Google Tasks API or Drive for notes storage.
**Alternative Implementation**:
- Use Google Drive to store encrypted notes
- Use Google Tasks API for task management
- Store API keys in encrypted JSON file on Drive
**Main Functions**:
```python
async def create_note(title: str, content: str) -> dict
async def get_note(note_id: str) -> dict
async def store_api_key(service_name: str, api_key: str) -> dict
async def retrieve_api_key(service_name: str) -> str
async def list_notes() -> List[dict]
```
---
## MCP Tools
### src/tools/email_tools.py
**Purpose**: MCP tool definitions for email operations
**Tools**:
1. **search_emails** - Search with advanced filters
2. **get_email** - Get email details
3. **send_email** - Send new email
4. **reply_to_email** - Reply to existing email
5. **categorize_emails** - Auto-categorize batch of emails
6. **generate_email_summary** - Generate summary of emails
7. **extract_calendar_invites** - Extract calendar invites from emails
8. **mark_email_read** - Mark as read/unread
9. **archive_email** - Archive email
10. **delete_email** - Delete/spam email
**Tool Definition Pattern**:
```python
Tool(
name="search_emails",
description="""
Search emails with advanced filtering. Supports:
- Query string (Gmail syntax)
- Date range
- Sender/recipient filters
- Category filters (work, personal, job applications, etc.)
- Attachment filters
Returns categorized emails with metadata.
""",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Gmail search query (e.g., 'from:example@gmail.com subject:meeting')"
},
"category": {
"type": "string",
"enum": ["work", "personal", "job_application", "urgent", "spam"],
"description": "Filter by category"
},
"max_results": {
"type": "integer",
"default": 50,
"description": "Maximum emails to return"
},
"after_date": {
"type": "string",
"description": "ISO date string (e.g., '2024-01-01')"
}
}
}
)
```
---
### src/tools/calendar_tools.py
**Tools**:
1. **create_calendar_event** - Create event with attendees
2. **get_calendar_events** - List events with filters
3. **update_calendar_event** - Update existing event
4. **delete_calendar_event** - Delete event
5. **find_free_slots** - Find available time slots
6. **detect_meeting_conflicts** - Check for conflicts
7. **create_focus_time** - Block time for focus
8. **get_calendar_summary** - Daily/weekly summary
---
### src/tools/job_tracking_tools.py
**Tools**:
1. **add_job_application** - Track new application
2. **update_job_status** - Update application status
3. **get_job_applications** - List applications with filters
4. **get_job_summary** - Daily/weekly summary
5. **schedule_follow_up** - Set follow-up reminder
6. **link_email_to_job** - Link email to application
---
## Main MCP Server
### src/mcp_server.py
**Purpose**: Main MCP server that registers all tools and handles requests
**Structure**:
```python
import asyncio
import sys
from pathlib import Path
from mcp.server import Server
from mcp.types import Tool, TextContent
import json
import structlog
# Add project to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.utils.config_loader import get_config
from src.utils.llm_manager import LLMManager
from src.adapters.gmail_adapter import GmailAdapter
from src.adapters.calendar_adapter import CalendarAdapter
from src.adapters.drive_adapter import DriveAdapter
from src.adapters.sheets_adapter import SheetsAdapter
from src.tools import email_tools, calendar_tools, job_tracking_tools
# Initialize
logger = structlog.get_logger(__name__)
config = get_config()
server = Server("enhanced-google-mcp-server")
# Initialize adapters
gmail = GmailAdapter()
calendar = CalendarAdapter()
drive = DriveAdapter()
sheets = SheetsAdapter()
# Initialize LLM manager
llm_config = config.get_llm_config()
llm_manager = LLMManager(config.yaml_config)
@server.list_tools()
async def list_tools():
"""Register all available tools"""
tools = []
# Email tools (10 tools)
tools.extend(email_tools.get_tool_definitions())
# Calendar tools (8 tools)
tools.extend(calendar_tools.get_tool_definitions())
# Job tracking tools (6 tools)
tools.extend(job_tracking_tools.get_tool_definitions())
# Drive tools (4 tools)
# ... add drive tools
# Sheets tools (4 tools)
# ... add sheets tools
logger.info(f"Registered {len(tools)} tools")
return tools
@server.call_tool()
async def call_tool(name: str, arguments: dict):
"""Route tool calls to appropriate handlers"""
logger.info(f"Tool called: {name}", tool=name, args=arguments)
try:
# Email tools
if name == "search_emails":
result = await gmail.search_emails(arguments, llm_manager)
return [TextContent(type="text", text=json.dumps(result, default=str))]
elif name == "send_email":
result = await gmail.send_email(arguments)
return [TextContent(type="text", text=json.dumps(result))]
# Calendar tools
elif name == "create_calendar_event":
result = await calendar.create_event(arguments)
return [TextContent(type="text", text=json.dumps(result, default=str))]
elif name == "find_free_slots":
result = await calendar.find_free_slots(arguments)
return [TextContent(type="text", text=json.dumps(result, default=str))]
# ... more tool handlers
else:
return [TextContent(
type="text",
text=json.dumps({"success": False, "error": f"Unknown tool: {name}"})
)]
except Exception as e:
logger.error(f"Tool execution failed", tool=name, error=str(e))
return [TextContent(
type="text",
text=json.dumps({"success": False, "error": str(e)})
)]
async def main():
"""Main entry point"""
from mcp.server.stdio import stdio_server
logger.info("Starting Enhanced MCP Server")
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
```
---
## Workflows (LangGraph)
### src/workflows/email_processing.py
**Purpose**: LangGraph workflow for email processing pipeline
**Steps**:
1. Fetch new emails
2. Categorize with LLM
3. Extract action items
4. Check for calendar invites
5. Identify job applications
6. Update database
7. Send notifications if urgent
**Pattern**:
```python
from langgraph.graph import StateGraph, END
from typing import TypedDict
class EmailWorkflowState(TypedDict):
emails: List[Email]
categorized_emails: List[Email]
action_items: List[dict]
calendar_invites: List[dict]
job_applications: List[JobApplication]
def create_email_workflow() -> StateGraph:
workflow = StateGraph(EmailWorkflowState)
# Add nodes
workflow.add_node("fetch", fetch_emails)
workflow.add_node("categorize", categorize_emails)
workflow.add_node("extract_actions", extract_action_items)
workflow.add_node("check_calendar", check_calendar_invites)
workflow.add_node("detect_jobs", detect_job_applications)
workflow.add_node("notify", send_notifications)
# Add edges
workflow.set_entry_point("fetch")
workflow.add_edge("fetch", "categorize")
workflow.add_edge("categorize", "extract_actions")
workflow.add_edge("extract_actions", "check_calendar")
workflow.add_edge("check_calendar", "detect_jobs")
workflow.add_edge("detect_jobs", "notify")
workflow.add_edge("notify", END)
return workflow.compile()
```
---
## Schedulers
### src/schedulers/email_scheduler.py
**Purpose**: Schedule automated email summaries
**Schedule**: Every 4 hours from 6 AM to 10 PM (6 AM, 10 AM, 2 PM, 6 PM, 10 PM)
**Implementation**:
```python
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
class EmailScheduler:
def __init__(self, gmail_adapter, llm_manager):
self.gmail = gmail_adapter
self.llm_manager = llm_manager
self.scheduler = AsyncIOScheduler()
async def generate_and_send_summary(self):
"""Generate and send email summary"""
# Fetch emails from last 4 hours
filter = EmailFilter(
after_date=datetime.now() - timedelta(hours=4),
max_results=100
)
emails = await self.gmail.search_emails(filter, self.llm_manager)
# Generate summary
summary = await self.gmail.generate_email_summary(emails, self.llm_manager)
# Send summary email
draft = EmailDraft(
to=[config.env_config.email_summary_recipient],
subject=f"Email Summary - {datetime.now().strftime('%Y-%m-%d %H:%M')}",
body=summary,
html=True
)
await self.gmail.send_email(draft)
def start(self):
"""Start scheduler"""
# Schedule for 6 AM, 10 AM, 2 PM, 6 PM, 10 PM
self.scheduler.add_job(
self.generate_and_send_summary,
CronTrigger(hour='6,10,14,18,22', minute=0),
id='email_summary'
)
self.scheduler.start()
```
---
## Setup Script
### src/setup.py
**Purpose**: Interactive setup for Google OAuth and configuration
```python
import os
import json
from pathlib import Path
from src.utils.google_auth import GoogleAuthManager, SCOPES
def main():
print("=" * 60)
print("Enhanced MCP Server - Setup")
print("=" * 60)
print()
# Check credentials.json
creds_path = Path(__file__).parent.parent / 'credentials.json'
if not creds_path.exists():
print("❌ credentials.json not found!")
print()
print("Please download it from:")
print("1. Go to https://console.cloud.google.com/")
print("2. Create/select project")
print("3. Enable APIs: Gmail, Calendar, Drive, Sheets")
print("4. Create OAuth 2.0 credentials")
print("5. Download credentials.json")
print("6. Place it in project root")
return
print("✅ credentials.json found")
print()
# Check .env
env_path = Path(__file__).parent.parent / '.env'
if not env_path.exists():
print("⚠️ .env file not found")
print("Copying from .env.example...")
example = Path(__file__).parent.parent / '.env.example'
if example.exists():
import shutil
shutil.copy(example, env_path)
print("✅ .env created")
print()
print("📝 Please edit .env and add your API keys:")
print(" - EURI_API_KEY")
print(" - DEEPSEEK_API_KEY")
print(" - GEMINI_API_KEY")
print(" - ANTHROPIC_API_KEY")
print(" - EMAIL_SUMMARY_RECIPIENT")
print()
# OAuth flow
print("Starting OAuth authentication...")
print()
print("Required scopes:")
for scope in SCOPES:
print(f" • {scope}")
print()
auth_manager = GoogleAuthManager()
try:
creds = auth_manager.get_credentials()
print()
print("✅ Authentication successful!")
print()
print("Setup complete! You can now:")
print("1. Configure Claude Desktop (see claude_desktop_config.json)")
print("2. Run: python src/mcp_server.py")
print()
except Exception as e:
print(f"❌ Authentication failed: {e}")
if __name__ == "__main__":
main()
```
---
## Claude Desktop Configuration
### claude_desktop_config.json
```json
{
"mcpServers": {
"enhanced-google-services": {
"command": "python",
"args": [
"C:\\Users\\pbkap\\Documents\\euron\\Projects\\mcpwithgoogle\\enhanced-mcp-server\\src\\mcp_server.py"
],
"env": {}
}
}
}
```
**Location**:
- Windows: `%APPDATA%\Claude\claude_desktop_config.json`
- Mac: `~/Library/Application Support/Claude/claude_desktop_config.json`
- Linux: `~/.config/Claude/claude_desktop_config.json`
---
## Next Steps
1. **Copy all code** from this guide into respective files
2. **Run setup**: `python src/setup.py`
3. **Configure .env** with your API keys
4. **Update claude_desktop_config.json**
5. **Restart Claude Desktop**
6. **Test tools** by asking Claude to list available tools
---
## Testing LLM Fallback
Create `scripts/test_llm_fallback.py`:
```python
import asyncio
from src.utils.config_loader import get_config
from src.utils.llm_manager import LLMManager
async def test():
config = get_config()
llm_config = config.get_llm_config()
manager = LLMManager(config.yaml_config)
# Test normal operation
response = await manager.generate("Say hello")
print(f"Response from {response.provider}: {response.content}")
# Check health
health = await manager.health_check()
print(f"\nHealth: {json.dumps(health, indent=2)}")
asyncio.run(test())
```
---
**Status**: Ready for implementation! Follow this guide to complete the system.