"""Daily Briefing MCP server implementation."""
from datetime import date, datetime, timedelta
from typing import Any
from fastmcp import FastMCP
from daily_briefing.aggregator import BriefingAggregator
from daily_briefing.config import settings
from daily_briefing.formatters import MarkdownFormatter, TextFormatter
from daily_briefing.logging import configure_logging, get_logger
from daily_briefing.sources import CalendarSource, FirefliesSource
# Configure logging
configure_logging()
logger = get_logger(__name__)
# Initialize the MCP server
mcp = FastMCP(
name="daily-briefing",
version="0.1.0",
)
# Lazy-initialized aggregator
_aggregator: BriefingAggregator | None = None
def get_aggregator() -> BriefingAggregator:
"""Get or create the briefing aggregator with configured sources."""
global _aggregator
if _aggregator is None:
sources = []
# Add TripIt source (travel calendar)
if settings.tripit_enabled and settings.tripit_ical_url:
sources.append(
CalendarSource(
name="TripIt",
ical_url=settings.tripit_ical_url,
enabled=True,
is_travel_source=True,
)
)
logger.info("source_configured", source="TripIt")
# Add Google Calendar source
if settings.google_calendar_enabled and settings.google_calendar_url:
sources.append(
CalendarSource(
name=settings.google_calendar_name,
ical_url=settings.google_calendar_url,
enabled=True,
is_travel_source=False,
)
)
logger.info("source_configured", source=settings.google_calendar_name)
# Add additional iCal sources
if settings.additional_ical_urls:
urls = settings.additional_ical_urls.split(",")
names = settings.additional_ical_names.split(",") if settings.additional_ical_names else []
for i, url in enumerate(urls):
url = url.strip()
if url:
name = names[i].strip() if i < len(names) else f"Calendar {i + 1}"
sources.append(
CalendarSource(
name=name,
ical_url=url,
enabled=True,
is_travel_source=False,
)
)
logger.info("source_configured", source=name)
# Add Fireflies source
if settings.fireflies_enabled and settings.fireflies_api_key:
sources.append(
FirefliesSource(
api_key=settings.fireflies_api_key,
api_url=settings.fireflies_api_url,
enabled=True,
)
)
logger.info("source_configured", source="Fireflies")
_aggregator = BriefingAggregator(sources)
logger.info("aggregator_initialized", source_count=len(sources))
return _aggregator
@mcp.tool()
async def health_check() -> dict[str, Any]:
"""Check Daily Briefing server health and all source connections.
Returns:
Health status including all source statuses
"""
aggregator = get_aggregator()
source_health = await aggregator.get_source_health()
all_healthy = all(
s.get("status") in ("healthy", "disabled")
for s in source_health.values()
)
return {
"status": "healthy" if all_healthy else "degraded",
"version": "0.1.0",
"sources": source_health,
}
@mcp.tool()
async def get_daily_briefing(
briefing_date: str | None = None,
format: str = "markdown",
) -> dict[str, Any]:
"""Generate a comprehensive daily briefing.
Aggregates calendar events, meeting summaries, action items, and travel
information from all configured sources into a unified briefing.
Args:
briefing_date: Date for the briefing (YYYY-MM-DD format, defaults to today)
format: Output format - "markdown", "text", or "json"
Returns:
The daily briefing in the requested format
"""
logger.info("get_daily_briefing_called", date=briefing_date, format=format)
# Parse date
if briefing_date:
try:
target_date = date.fromisoformat(briefing_date)
except ValueError:
return {"error": f"Invalid date format: {briefing_date}. Use YYYY-MM-DD."}
else:
target_date = date.today()
aggregator = get_aggregator()
briefing = await aggregator.generate_briefing(target_date)
if format == "json":
return {"briefing": briefing.to_dict()}
elif format == "text":
return {
"date": target_date.isoformat(),
"briefing": TextFormatter.format(briefing),
}
else: # markdown
return {
"date": target_date.isoformat(),
"briefing": MarkdownFormatter.format(briefing),
}
@mcp.tool()
async def get_todays_schedule() -> dict[str, Any]:
"""Get today's calendar events and any scheduling conflicts.
Returns:
Today's events, conflicts, and available time slots
"""
logger.info("get_todays_schedule_called")
aggregator = get_aggregator()
briefing = await aggregator.generate_briefing(date.today())
events = [
{
"title": e.title,
"start": e.start.strftime("%I:%M %p"),
"end": e.end.strftime("%I:%M %p"),
"duration_minutes": int((e.end - e.start).total_seconds() / 60),
"location": e.location,
"source": e.source,
"type": e.event_type.value,
}
for e in briefing.events
if not e.is_all_day
]
conflicts = [
{
"type": c.conflict_type.value,
"description": c.description,
"severity": c.severity.value,
}
for c in briefing.conflicts
]
free_slots = [
{
"start": s.start.strftime("%I:%M %p"),
"end": s.end.strftime("%I:%M %p"),
"duration_minutes": s.duration_minutes,
}
for s in briefing.free_slots
if s.duration_minutes >= 30
]
return {
"date": date.today().isoformat(),
"events": events,
"event_count": len(events),
"conflicts": conflicts,
"free_slots": free_slots,
"total_meeting_hours": round(briefing.total_meeting_hours, 1),
"focus_time_hours": round(briefing.focus_time_hours, 1),
}
@mcp.tool()
async def get_action_items() -> dict[str, Any]:
"""Get pending action items from recent meetings.
Returns:
List of action items with their sources
"""
logger.info("get_action_items_called")
aggregator = get_aggregator()
briefing = await aggregator.generate_briefing(date.today())
items = [
{
"text": item.text,
"source": item.source,
"meeting_title": item.meeting_title,
"meeting_date": item.meeting_date.isoformat() if item.meeting_date else None,
"priority": item.priority.value,
}
for item in briefing.action_items
]
return {
"action_items": items,
"count": len(items),
}
@mcp.tool()
async def get_recent_meetings(days: int = 1) -> dict[str, Any]:
"""Get summaries of recent meetings.
Args:
days: Number of days to look back (default: 1)
Returns:
List of meeting summaries with key points and action items
"""
logger.info("get_recent_meetings_called", days=days)
# Temporarily adjust settings for this request
original_lookback = settings.meeting_lookback_days
settings.meeting_lookback_days = max(days, 1)
try:
aggregator = get_aggregator()
briefing = await aggregator.generate_briefing(date.today())
meetings = [
{
"title": m.title,
"date": m.date.isoformat(),
"duration_minutes": m.duration_minutes,
"participants": m.participants,
"overview": m.overview,
"action_items": m.action_items,
"key_topics": m.key_topics,
}
for m in briefing.recent_meetings
]
return {
"meetings": meetings,
"count": len(meetings),
"lookback_days": days,
}
finally:
settings.meeting_lookback_days = original_lookback
@mcp.tool()
async def get_upcoming_travel(days: int = 7) -> dict[str, Any]:
"""Get upcoming travel information.
Args:
days: Number of days to look ahead (default: 7)
Returns:
List of upcoming trips with details and alerts
"""
logger.info("get_upcoming_travel_called", days=days)
# Temporarily adjust settings for this request
original_lookahead = settings.travel_lookahead_days
settings.travel_lookahead_days = days
try:
aggregator = get_aggregator()
briefing = await aggregator.generate_briefing(date.today())
travel = [
{
"title": t.title,
"start_date": t.start_date.isoformat(),
"end_date": t.end_date.isoformat(),
"destination": t.destination,
"days_until": (t.start_date - date.today()).days,
"events": [
{
"title": e.title,
"type": e.event_type.value,
"start": e.start.isoformat(),
}
for e in t.events
],
"alerts": t.alerts,
}
for t in briefing.upcoming_travel
]
return {
"travel": travel,
"count": len(travel),
"alerts": briefing.travel_alerts,
"lookahead_days": days,
}
finally:
settings.travel_lookahead_days = original_lookahead
@mcp.tool()
async def check_availability(
check_date: str,
duration_minutes: int = 60,
) -> dict[str, Any]:
"""Check available time slots on a specific date.
Args:
check_date: Date to check (YYYY-MM-DD format)
duration_minutes: Minimum slot duration needed (default: 60)
Returns:
Available time slots that can accommodate the requested duration
"""
logger.info(
"check_availability_called",
date=check_date,
duration=duration_minutes,
)
try:
target_date = date.fromisoformat(check_date)
except ValueError:
return {"error": f"Invalid date format: {check_date}. Use YYYY-MM-DD."}
aggregator = get_aggregator()
briefing = await aggregator.generate_briefing(target_date)
available_slots = [
{
"start": s.start.strftime("%I:%M %p"),
"end": s.end.strftime("%I:%M %p"),
"duration_minutes": s.duration_minutes,
}
for s in briefing.free_slots
if s.duration_minutes >= duration_minutes
]
return {
"date": target_date.isoformat(),
"requested_duration": duration_minutes,
"available_slots": available_slots,
"slot_count": len(available_slots),
"has_availability": len(available_slots) > 0,
}
@mcp.tool()
async def get_week_overview() -> dict[str, Any]:
"""Get an overview of the upcoming week.
Returns:
Summary of each day's events, meeting load, and travel
"""
logger.info("get_week_overview_called")
aggregator = get_aggregator()
today = date.today()
week_data = []
for i in range(7):
day = today + timedelta(days=i)
briefing = await aggregator.generate_briefing(day)
week_data.append({
"date": day.isoformat(),
"day_name": day.strftime("%A"),
"event_count": len([e for e in briefing.events if not e.is_all_day]),
"meeting_hours": round(briefing.total_meeting_hours, 1),
"focus_hours": round(briefing.focus_time_hours, 1),
"has_conflicts": len(briefing.conflicts) > 0,
"has_travel": len(briefing.upcoming_travel) > 0,
})
# Calculate week totals
total_meetings = sum(d["event_count"] for d in week_data)
total_meeting_hours = sum(d["meeting_hours"] for d in week_data)
days_with_conflicts = sum(1 for d in week_data if d["has_conflicts"])
return {
"week_start": today.isoformat(),
"days": week_data,
"summary": {
"total_meetings": total_meetings,
"total_meeting_hours": round(total_meeting_hours, 1),
"days_with_conflicts": days_with_conflicts,
"busiest_day": max(week_data, key=lambda d: d["meeting_hours"])["day_name"],
},
}
def run_server() -> None:
"""Run the Daily Briefing MCP server."""
logger.info(
"starting_daily_briefing_server",
transport=settings.transport,
host=settings.host,
port=settings.port,
)
if settings.transport in ("streamable-http", "sse"):
mcp.run(
transport="sse",
host=settings.host,
port=settings.port,
)
else:
mcp.run(transport="stdio")