server.py•13.4 kB
#!/usr/bin/env python3
"""
Synphony MCP Server
A FastMCP server for managing video datasets with Hugging Face Hub integration.
"""
import os
import sys
from pathlib import Path
from typing import List, Dict, Optional, Any
import logging
from fastmcp import FastMCP
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize FastMCP server
mcp = FastMCP("Synphony MCP")
# Configuration
DATASET_DIRECTORY = os.environ.get("DATASET_DIRECTORY", os.getcwd())
HF_TOKEN = os.environ.get("HF_TOKEN")
HF_DATASET_REPO_ID = os.environ.get("HF_DATASET_REPO_ID")
# Supported video extensions
VIDEO_EXTS = {
".mp4", ".mov", ".mkv", ".avi", ".wmv", ".flv",
".webm", ".m4v", ".mpeg", ".mpg", ".3gp", ".ts"
}
MAX_UPLOAD_BATCH = 50
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _normalize_and_validate_path(candidate: str) -> Path:
"""Validate and normalize a path within DATASET_DIRECTORY."""
base = Path(DATASET_DIRECTORY).resolve()
p = (base / candidate).resolve()
if base not in p.parents and p != base:
raise ValueError(f"Path '{candidate}' is outside of DATASET_DIRECTORY")
return p
def _is_video_file(path: Path) -> bool:
"""Check if a file has a video extension."""
return path.suffix.lower() in VIDEO_EXTS
@mcp.tool
def list_datasets() -> Dict[str, Any]:
"""
List available datasets (directories) in the DATASET_DIRECTORY.
Returns:
Dictionary with available datasets for user selection
"""
try:
base_path = Path(DATASET_DIRECTORY).resolve()
if not base_path.exists():
return {
"error": f"Dataset directory does not exist: {DATASET_DIRECTORY}",
"datasets": []
}
if not base_path.is_dir():
return {
"error": f"Dataset path is not a directory: {DATASET_DIRECTORY}",
"datasets": []
}
datasets = []
for item in base_path.iterdir():
if item.is_dir():
# Count video files in this dataset
video_count = 0
for file_path in item.rglob("*"):
if file_path.is_file() and _is_video_file(file_path):
video_count += 1
datasets.append({
"name": item.name,
"path": str(item.relative_to(base_path)),
"full_path": str(item),
"video_count": video_count
})
datasets.sort(key=lambda x: x["name"])
return {
"dataset_directory": DATASET_DIRECTORY,
"total_datasets": len(datasets),
"datasets": datasets,
"message": "Select a dataset to work with by using the dataset name or path"
}
except Exception as e:
return {
"error": f"Failed to list datasets: {str(e)}",
"datasets": []
}
@mcp.tool
def list_videos(directory: str = ".") -> Dict[str, Any]:
"""
List video files in a directory within DATASET_DIRECTORY.
Args:
directory: Relative path within DATASET_DIRECTORY to search (e.g., dataset name)
Returns:
Dictionary with video files found and directory info
"""
try:
dir_path = _normalize_and_validate_path(directory)
if not dir_path.exists():
return {
"directory": directory,
"error": "Directory not found",
"videos": []
}
if not dir_path.is_dir():
return {
"directory": directory,
"error": "Path is not a directory",
"videos": []
}
videos = []
for file_path in dir_path.rglob("*"):
if file_path.is_file() and _is_video_file(file_path):
relative_path = file_path.relative_to(Path(DATASET_DIRECTORY))
videos.append({
"path": str(relative_path),
"name": file_path.name,
"size": file_path.stat().st_size,
"extension": file_path.suffix.lower()
})
return {
"directory": directory,
"dataset_directory": DATASET_DIRECTORY,
"total_videos": len(videos),
"videos": videos
}
except ValueError as e:
return {
"directory": directory,
"error": str(e),
"videos": []
}
except Exception as e:
return {
"directory": directory,
"error": f"Unexpected error: {str(e)}",
"videos": []
}
@mcp.tool
def get_server_info() -> Dict[str, Any]:
"""
Get server configuration and status information.
Returns:
Dictionary with server configuration details
"""
return {
"server_name": "Synphony MCP",
"version": "1.0.0",
"dataset_directory": DATASET_DIRECTORY,
"hf_token_configured": bool(HF_TOKEN),
"hf_dataset_repo": HF_DATASET_REPO_ID,
"supported_video_extensions": list(VIDEO_EXTS),
"max_upload_batch": MAX_UPLOAD_BATCH,
"python_version": sys.version,
"working_directory": os.getcwd()
}
@mcp.tool
def validate_setup() -> Dict[str, Any]:
"""
Validate the server setup and configuration.
Returns:
Dictionary with validation results
"""
issues = []
warnings = []
# Check dataset directory
dataset_path = Path(DATASET_DIRECTORY)
if not dataset_path.exists():
issues.append(f"DATASET_DIRECTORY does not exist: {DATASET_DIRECTORY}")
elif not dataset_path.is_dir():
issues.append(f"DATASET_DIRECTORY is not a directory: {DATASET_DIRECTORY}")
else:
# Check if readable
try:
list(dataset_path.iterdir())
except PermissionError:
issues.append(f"Cannot read DATASET_DIRECTORY: {DATASET_DIRECTORY}")
# Check HF configuration
if not HF_TOKEN:
warnings.append("HF_TOKEN not configured - Hugging Face features will be unavailable")
if not HF_DATASET_REPO_ID:
warnings.append("HF_DATASET_REPO_ID not configured - Hugging Face upload will be unavailable")
# Try importing optional dependencies
try:
import huggingface_hub
hf_available = True
except ImportError:
hf_available = False
warnings.append("huggingface_hub not installed - HF features unavailable")
return {
"valid": len(issues) == 0,
"issues": issues,
"warnings": warnings,
"huggingface_available": hf_available,
"dataset_dir_accessible": len([i for i in issues if "DATASET_DIRECTORY" in i]) == 0
}
@mcp.tool
def upload_to_huggingface(video_paths: List[str], dataset_name: Optional[str] = None) -> Dict[str, Any]:
"""
Upload video files to Hugging Face Hub dataset.
Args:
video_paths: List of relative paths to video files within DATASET_DIRECTORY
dataset_name: Name for a new dataset to create (optional). If provided, creates username/dataset_name repo
Returns:
Dictionary with upload results and status
"""
if not HF_TOKEN:
return {
"success": False,
"error": "HF_TOKEN not configured. Please set your Hugging Face token in the environment.",
"uploaded_files": [],
"failed_files": []
}
# Determine target repository
if dataset_name:
# Create new dataset with user's username
try:
from huggingface_hub import HfApi
api = HfApi(token=HF_TOKEN)
user_info = api.whoami()
username = user_info["name"]
target_repo_id = f"{username}/{dataset_name}"
except Exception as e:
return {
"success": False,
"error": f"Failed to get user info for dataset creation: {str(e)}",
"uploaded_files": [],
"failed_files": []
}
elif HF_DATASET_REPO_ID:
target_repo_id = HF_DATASET_REPO_ID
else:
return {
"success": False,
"error": "Either provide dataset_name parameter or configure HF_DATASET_REPO_ID environment variable.",
"uploaded_files": [],
"failed_files": []
}
if not video_paths:
return {
"success": False,
"error": "No video paths provided",
"uploaded_files": [],
"failed_files": []
}
if len(video_paths) > MAX_UPLOAD_BATCH:
return {
"success": False,
"error": f"Too many files to upload at once. Maximum allowed: {MAX_UPLOAD_BATCH}, provided: {len(video_paths)}",
"uploaded_files": [],
"failed_files": []
}
try:
from huggingface_hub import HfApi, upload_file, create_repo
api = HfApi(token=HF_TOKEN)
uploaded_files = []
failed_files = []
# Create dataset repository if using dataset_name parameter
if dataset_name:
try:
logger.info(f"Creating dataset repository: {target_repo_id}")
create_repo(
repo_id=target_repo_id,
token=HF_TOKEN,
repo_type="dataset",
exist_ok=True, # Don't fail if repo already exists
private=False # Make dataset public by default
)
logger.info(f"Dataset repository {target_repo_id} created/confirmed")
except Exception as e:
logger.warning(f"Failed to create repository {target_repo_id}: {str(e)}")
# Continue anyway - repo might already exist
logger.info(f"Starting upload of {len(video_paths)} files to {target_repo_id}")
for video_path in video_paths:
try:
# Validate and get full path
full_path = _normalize_and_validate_path(video_path)
if not full_path.exists():
failed_files.append({
"path": video_path,
"error": "File not found"
})
continue
if not _is_video_file(full_path):
failed_files.append({
"path": video_path,
"error": "Not a video file"
})
continue
# Upload to HF Hub - preserve directory structure
path_in_repo = f"videos/{video_path}"
logger.info(f"Uploading {video_path} to {path_in_repo}")
upload_file(
path_or_fileobj=str(full_path),
path_in_repo=path_in_repo,
repo_id=target_repo_id,
token=HF_TOKEN,
repo_type="dataset"
)
uploaded_files.append({
"local_path": video_path,
"repo_path": path_in_repo,
"size": full_path.stat().st_size
})
logger.info(f"Successfully uploaded {video_path}")
except Exception as e:
failed_files.append({
"path": video_path,
"error": str(e)
})
logger.error(f"Failed to upload {video_path}: {str(e)}")
success = len(uploaded_files) > 0
total_size = sum(f["size"] for f in uploaded_files)
return {
"success": success,
"total_files_attempted": len(video_paths),
"uploaded_count": len(uploaded_files),
"failed_count": len(failed_files),
"total_size_bytes": total_size,
"dataset_repo": target_repo_id,
"dataset_created": bool(dataset_name),
"uploaded_files": uploaded_files,
"failed_files": failed_files,
"message": f"Upload completed to {target_repo_id}: {len(uploaded_files)} successful, {len(failed_files)} failed"
}
except ImportError:
return {
"success": False,
"error": "huggingface_hub library not installed. Please install it with: pip install huggingface_hub",
"uploaded_files": [],
"failed_files": []
}
except Exception as e:
return {
"success": False,
"error": f"Upload failed: {str(e)}",
"uploaded_files": [],
"failed_files": []
}
if __name__ == "__main__":
logger.info("Starting Synphony MCP Server...")
logger.info(f"Dataset directory: {DATASET_DIRECTORY}")
logger.info(f"HF Token configured: {bool(HF_TOKEN)}")
logger.info(f"HF Dataset repo: {HF_DATASET_REPO_ID or 'Not configured'}")
# Run the server
mcp.run()