"""MCP Server with YouTube transcript and future tools."""
from typing import Literal, Optional
from mcp.server.fastmcp import FastMCP
from core import Config, TTLCache, RateLimiter
from tools import get_transcript, list_transcript_languages
# Initialize configuration
Config.ensure_cache_dir()
# Initialize cache and rate limiter
cache = (
TTLCache(
ttl_seconds=Config.CACHE_TTL,
max_size=Config.CACHE_MAX_SIZE,
cache_dir=Config.CACHE_DIR if Config.CACHE_ENABLED else None,
)
if Config.CACHE_ENABLED
else None
)
rate_limiter = (
RateLimiter(
max_requests=Config.RATE_LIMIT_REQUESTS, window_seconds=Config.RATE_LIMIT_WINDOW
)
if Config.RATE_LIMIT_ENABLED
else None
)
# Create MCP server
mcp = FastMCP("Personal MCP Server", stateless_http=True, json_response=True)
def check_rate_limit(client_id: str = "default") -> Optional[dict]:
"""Check rate limit and return error dict if exceeded."""
if rate_limiter and not rate_limiter.is_allowed(client_id):
return {
"success": False,
"error": "RATE_LIMIT_EXCEEDED",
"message": f"Rate limit exceeded: {Config.RATE_LIMIT_REQUESTS} requests per {Config.RATE_LIMIT_WINDOW} seconds",
}
return None
@mcp.tool()
def youtube_get_transcript(
video_url_or_id: str,
languages: Optional[list[str]] = None,
format: Literal["plain", "structured", "srt", "vtt"] = "plain",
use_cache: bool = True,
) -> dict:
"""
Fetch YouTube video transcript with optional caching.
Args:
video_url_or_id: YouTube URL or 11-character video ID
languages: Preferred languages in order (default: ["en"])
format: Output format - plain, structured (with timestamps), srt, or vtt
use_cache: Whether to use cached result if available (default: True)
Returns:
Dictionary with transcript data or error information
Example:
youtube_get_transcript("dQw4w9WgXcQ", languages=["en"], format="plain")
"""
# Check rate limit
rate_error = check_rate_limit()
if rate_error:
return rate_error
# Check cache
cache_key = f"{video_url_or_id}:{','.join(languages or ['en'])}:{format}"
if use_cache and cache:
cached_result = cache.get(cache_key)
if cached_result:
cached_result["cached"] = True
return cached_result
# Fetch transcript
result = get_transcript(video_url_or_id, languages, format)
# Cache successful results
if result.get("success") and cache:
cache.set(cache_key, result)
result["cached"] = False
return result
@mcp.tool()
def youtube_list_languages(video_url_or_id: str) -> dict:
"""
List all available transcript languages for a YouTube video.
Use this tool before fetching a transcript to discover which languages
are available and whether they are auto-generated or manual.
Args:
video_url_or_id: YouTube URL or 11-character video ID
Returns:
Dictionary with list of available languages and their metadata
Example:
youtube_list_languages("dQw4w9WgXcQ")
"""
# Check rate limit
rate_error = check_rate_limit()
if rate_error:
return rate_error
return list_transcript_languages(video_url_or_id)
@mcp.tool()
def get_server_stats() -> dict:
"""
Get MCP server statistics including cache and rate limit info.
Useful for monitoring server performance and health.
Returns:
Dictionary with server statistics
"""
stats = {
"server": "Personal MCP Server",
"tools": [
"youtube_get_transcript",
"youtube_list_languages",
"get_server_stats",
],
}
if cache:
stats["cache"] = cache.stats()
if rate_limiter:
stats["rate_limit"] = rate_limiter.get_stats()
return stats
@mcp.resource("transcript://{video_id}")
def get_transcript_resource(video_id: str) -> str:
"""
MCP Resource: Direct access to cached YouTube transcripts.
URI format: transcript://VIDEO_ID
Returns plain text transcript (uses cache if available).
"""
cache_key = f"{video_id}:en:plain"
if cache:
cached = cache.get(cache_key)
if cached and cached.get("success"):
return cached.get("transcript", "")
# Fetch if not cached
result = get_transcript(video_id, ["en"], "plain")
if result.get("success"):
if cache:
cache.set(cache_key, result)
return result.get("transcript", "")
return f"Error: {result.get('message', 'Unknown error')}"
class HostRewriteMiddleware:
"""Middleware to rewrite Host header to localhost for MCP compatibility."""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
# Rewrite the server to localhost
scope["server"] = ("localhost", Config.PORT)
# Rewrite headers to use localhost
headers = dict(scope.get("headers", []))
headers[b"host"] = f"localhost:{Config.PORT}".encode()
scope["headers"] = list(headers.items())
await self.app(scope, receive, send)
def main():
"""Run the MCP server with uvicorn directly for better control."""
import uvicorn
print(f"Starting MCP server on {Config.HOST}:{Config.PORT}")
print(f"Cache: {'Enabled' if Config.CACHE_ENABLED else 'Disabled'}")
print(f"Rate limiting: {'Enabled' if Config.RATE_LIMIT_ENABLED else 'Disabled'}")
# Get the MCP app and wrap with host rewriting middleware
mcp_app = mcp.streamable_http_app()
app = HostRewriteMiddleware(mcp_app)
# Run with uvicorn
uvicorn.run(app, host=Config.HOST, port=Config.PORT, server_header=False)
if __name__ == "__main__":
main()