"""Main MCP server for yt-fetch."""
import asyncio
import logging
import os
from typing import Any
from mcp.server import Server
from mcp.server.lowlevel import NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
EmbeddedResource,
ImageContent,
Resource,
TextContent,
Tool,
)
from .logging_config import setup_logging
from .resources import create_resources
from .tools import create_tools
from .youtube_service import YouTubeService
logger = logging.getLogger(__name__)
app = Server("yt-fetch")
# Initialize YouTube service
youtube_service = YouTubeService()
@app.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available tools."""
return create_tools()
@app.call_tool()
async def handle_call_tool(
name: str, arguments: dict[str, Any] | None
) -> list[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool execution."""
if arguments is None:
arguments = {}
try:
if name == "search_videos":
result = await youtube_service.search_videos(
query=arguments["query"],
max_results=arguments.get("max_results", 10),
order=arguments.get("order", "relevance"),
published_after=arguments.get("published_after"),
published_before=arguments.get("published_before"),
duration=arguments.get("duration"),
video_type=arguments.get("video_type"),
region_code=arguments.get("region_code"),
)
return [TextContent(type="text", text=str(result))]
elif name == "get_video_details":
result = await youtube_service.get_video_details(
video_id=arguments["video_id"]
)
return [TextContent(type="text", text=str(result))]
elif name == "get_channel_info":
result = await youtube_service.get_channel_info(
channel_id=arguments["channel_id"]
)
return [TextContent(type="text", text=str(result))]
elif name == "filter_videos":
result = await youtube_service.filter_videos(
videos=arguments["videos"],
min_views=arguments.get("min_views"),
max_views=arguments.get("max_views"),
min_duration=arguments.get("min_duration"),
max_duration=arguments.get("max_duration"),
keywords=arguments.get("keywords"),
exclude_keywords=arguments.get("exclude_keywords"),
)
return [TextContent(type="text", text=str(result))]
elif name == "get_transcripts":
result = await youtube_service.get_transcripts(
video_ids=arguments["video_ids"],
analysis_type=arguments.get("analysis_type", "summary"),
)
return [TextContent(type="text", text=str(result))]
elif name == "trending_analysis":
result = await youtube_service.trending_analysis(
category_id=arguments.get("category_id"),
region_code=arguments.get("region_code", "US"),
max_results=arguments.get("max_results", 25),
)
return [TextContent(type="text", text=str(result))]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error in tool {name}: {e}")
return [TextContent(type="text", text=f"Error: {e}")]
@app.list_resources()
async def handle_list_resources() -> list[Resource]:
"""List available resources."""
return create_resources()
@app.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read a resource."""
try:
if uri.startswith("youtube://search/"):
query = uri.replace("youtube://search/", "")
result = await youtube_service.search_videos(query=query)
return str(result)
elif uri.startswith("youtube://video/") and uri.endswith("/metadata"):
video_id = uri.replace("youtube://video/", "").replace("/metadata", "")
result = await youtube_service.get_video_details(video_id=video_id)
return str(result)
elif uri.startswith("youtube://channel/"):
channel_id = uri.replace("youtube://channel/", "")
result = await youtube_service.get_channel_info(channel_id=channel_id)
return str(result)
else:
raise ValueError(f"Unknown resource URI: {uri}")
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
return f"Error: {e}"
async def main():
"""Main entry point for the MCP server."""
# Setup logging with Rich
setup_logging(level=os.getenv("LOG_LEVEL", "INFO"))
# Check for required environment variables
if not os.getenv("YOUTUBE_API_KEY"):
logger.error("YOUTUBE_API_KEY environment variable is required")
raise ValueError("YOUTUBE_API_KEY environment variable is required")
# Run the server using stdio transport
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="yt-fetch",
server_version="0.1.0",
capabilities=app.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
def cli_main():
"""Synchronous entry point for the CLI."""
asyncio.run(main())
if __name__ == "__main__":
cli_main()