Skip to main content
Glama
server.py27.4 kB
import os import httpx import time import asyncio import random from typing import Optional, Dict, Any, List, Union, Literal from mcp.server.fastmcp import FastMCP import sys import json from pathlib import Path # Initialize MCP server instance mcp = FastMCP("Flyworks-API") # Get API key from environment variable api_key = os.environ.get("FLYWORKS_API_TOKEN") # Flyworks API base URL (updated to v2) api_base_url = os.environ.get("FLYWORKS_API_BASE_URL", "https://hfw-api.hifly.cc/api/v2/hifly") # base path for output files output_base_path = os.environ.get("FLYWORKS_OUTPUT_BASE_PATH", "./output") # Default output path for videos DEFAULT_OUTPUT_PATH = os.path.join(output_base_path, "output.mp4") def get_auth_headers(): """Generate authentication headers for API requests""" return { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } async def upload_file(file_path: str) -> Dict[str, Any]: """ Upload a local file to Flyworks server Args: file_path: Path to the local file Returns: Dictionary containing file_id if successful, or error information """ # Get file extension file_extension = os.path.splitext(file_path)[1].lstrip('.') if not file_extension: return {"error": "File has no extension"} # Get content type based on extension content_types = { "mp4": "video/mp4", "mov": "video/quicktime", "mp3": "audio/mpeg", "m4a": "audio/mp4", "wav": "audio/wav", "jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png" } content_type = content_types.get(file_extension.lower()) if not content_type: return {"error": f"Unsupported file extension: {file_extension}"} try: # Step 1: Get upload URL async with httpx.AsyncClient() as client: response = await client.post( f"{api_base_url}/tool/create_upload_url", headers=get_auth_headers(), json={"file_extension": file_extension}, timeout=30 ) if response.status_code != 200: return { "error": f"Failed to get upload URL: {response.status_code}", "details": response.json() } upload_data = response.json() upload_url = upload_data.get("upload_url") content_type = upload_data.get("content_type") file_id = upload_data.get("file_id") if not upload_url or not file_id: return {"error": "Invalid response from create_upload_url endpoint"} # Step 2: Upload file to the URL with open(file_path, "rb") as file: file_content = file.read() upload_response = await client.put( upload_url, headers={"Content-Type": content_type}, content=file_content ) if upload_response.status_code not in [200, 204]: return { "error": f"File upload failed: {upload_response.status_code}", "details": upload_response.text } return {"file_id": file_id} except Exception as e: return {"error": f"File upload failed: {str(e)}"} async def wait_for_task_completion(task_id: str, timeout: int = 600, interval: int = 10) -> Dict[str, Any]: """ Wait for a task to complete Args: task_id: ID of the task to wait for timeout: Maximum time to wait in seconds interval: Polling interval in seconds Returns: Task result or error information """ start_time = time.time() while time.time() - start_time < timeout: try: async with httpx.AsyncClient() as client: response = await client.get( f"{api_base_url}/video/task", headers=get_auth_headers(), params={"task_id": task_id}, timeout=30 ) if response.status_code != 200: return { "error": f"Failed to check task status: {response.status_code}", "details": response.json() } task_data = response.json() status = task_data.get("status") # Status: 1=Waiting, 2=Processing, 3=Completed, 4=Failed if status == 3: # Completed return task_data elif status == 4: # Failed return { "error": "Task failed", "details": task_data } # Still processing, wait for next check await asyncio.sleep(interval) except Exception as e: return {"error": f"Error checking task status: {str(e)}"} return {"error": f"Task timed out after {timeout} seconds"} async def download_video(video_url: str, output_path: str) -> Dict[str, Any]: """ Download a video from URL to a local file Args: video_url: URL of the video to download output_path: Where to save the downloaded video Returns: Success message or error information """ try: async with httpx.AsyncClient() as client: response = await client.get(video_url, timeout=60) if response.status_code != 200: return { "error": f"Video download failed: {response.status_code}", "details": response.text } with open(output_path, "wb") as f: f.write(response.content) return { "success": True, "message": f"Video downloaded to {output_path}", "output_path": output_path } except Exception as e: return {"error": f"Video download failed: {str(e)}"} async def get_voice_list(voice_type: Optional[int] = None) -> Dict[str, Any]: """ Get list of available voices from Flyworks API Args: voice_type: Optional filter for voice type (1=self-cloned, 2=public) Returns: Dictionary containing voice list if successful, or error information """ # API key validation if not api_key: return {"error": "API key not found. Please set FLYWORKS_API_TOKEN environment variable."} try: params = {} if voice_type is not None: params["kind"] = voice_type async with httpx.AsyncClient() as client: response = await client.get( f"{api_base_url}/voice/list", headers=get_auth_headers(), params=params, timeout=30 ) if response.status_code != 200: return { "error": f"Failed to get voice list: {response.status_code}", "details": response.json() } return response.json() except Exception as e: return {"error": f"Failed to get voice list: {str(e)}"} async def create_avatar( video_url: Optional[str] = None, image_url: Optional[str] = None, video_file: Optional[str] = None, image_file: Optional[str] = None, title: Optional[str] = None, model: int = 2 ) -> Dict[str, Any]: """ Create a digital human avatar using video or image Args: video_url: Remote URL of the video file image_url: Remote URL of the image file video_file: Local path to the video file image_file: Local path to the image file title: Title for the avatar model: Model type, 1: Video 2.0, 2: Video 2.1, default 2 Returns: Dictionary containing avatar ID if successful, or error information """ # API key validation if not api_key: return {"error": "API key not found. Please set FLYWORKS_API_TOKEN environment variable."} # Parameter validation video_options = sum(1 for x in [video_url, video_file] if x) image_options = sum(1 for x in [image_url, image_file] if x) if video_options + image_options != 1: return {"error": "Exactly one of video_url, video_file, image_url, or image_file must be provided"} # Determine creation method create_by_video = video_options > 0 # Prepare payload payload = { "title": title or "Generated Avatar", } if create_by_video: endpoint = f"{api_base_url}/avatar/create_by_video" if video_file: upload_result = await upload_file(video_file) if "error" in upload_result: return upload_result file_id = upload_result.get("file_id") payload["file_id"] = file_id else: payload["video_url"] = video_url else: endpoint = f"{api_base_url}/avatar/create_by_image" if image_file: upload_result = await upload_file(image_file) if "error" in upload_result: return upload_result file_id = upload_result.get("file_id") payload["file_id"] = file_id else: payload["image_url"] = image_url payload["model"] = model # Create the avatar try: async with httpx.AsyncClient() as client: response = await client.post( endpoint, headers=get_auth_headers(), json=payload, timeout=30 ) if response.status_code != 200: return { "error": f"API request failed with status {response.status_code}", "details": response.json() } response_data = response.json() task_id = response_data.get("task_id") if not task_id: return {"error": "No task_id in response", "details": response_data} # Wait for avatar creation to complete avatar_result = await wait_for_avatar_creation(task_id) if "error" in avatar_result: return avatar_result return { "avatar": avatar_result.get("avatar"), "message": f"Avatar created successfully using {'video' if create_by_video else 'image'}" } except httpx.RequestError as e: return {"error": f"Request error: {str(e)}"} except Exception as e: return {"error": f"Unexpected error: {str(e)}"} async def wait_for_avatar_creation(task_id: str, timeout: int = 600, interval: int = 10) -> Dict[str, Any]: """ Wait for an avatar creation task to complete Args: task_id: ID of the task to wait for timeout: Maximum time to wait in seconds interval: Polling interval in seconds Returns: Task result or error information """ start_time = time.time() while time.time() - start_time < timeout: try: async with httpx.AsyncClient() as client: response = await client.get( f"{api_base_url}/avatar/task", headers=get_auth_headers(), params={"task_id": task_id}, timeout=30 ) if response.status_code != 200: return { "error": f"Failed to check avatar task status: {response.status_code}", "details": response.json() } task_data = response.json() status = task_data.get("status") # Status: 1=Waiting, 2=Processing, 3=Completed, 4=Failed if status == 3: # Completed return task_data elif status == 4: # Failed return { "error": "Avatar creation failed", "details": task_data } # Still processing, wait for next check await asyncio.sleep(interval) except Exception as e: return {"error": f"Error checking avatar task status: {str(e)}"} return {"error": f"Avatar creation timed out after {timeout} seconds"} @mcp.tool( description=""" Create lipsync video by audio input. The tool will animate a digital human avatar to speak in sync with the provided audio. Parameters: - avatar: Digital human avatar ID. Either this or avatar creation parameters must be provided. - avatar_video_url: URL of a video to create the avatar from. - avatar_image_url: URL of an image to create the avatar from. - avatar_video_file: Local path to a video file to create the avatar from. - avatar_image_file: Local path to an image file to create the avatar from. - audio_url: Remote URL of the audio file. One of audio_url or audio_file must be provided. - audio_file: Local path to the audio file. One of audio_url or audio_file must be provided. - title: Optional title for the created video. - async_mode: If true, returns task_id immediately. If false, waits for completion and downloads the video. Default is false. - output_path: Where to save the downloaded video if async_mode is false. Default is "output.mp4". Avatar creation: Provide exactly ONE of avatar_video_url, avatar_image_url, avatar_video_file, or avatar_image_file to create a new avatar. If avatar ID is directly provided, these parameters will be ignored. Audio file should be mp3, m4a, or wav format, within 20MB, and between 1 second and 3 minutes. Returns: - If async_mode is true: task_id for checking status later - If async_mode is false: downloaded video path and task result - If download fails: error message, task_id, task_result, and video_url for manual download """ ) async def create_lipsync_video_by_audio( avatar: Optional[str] = None, avatar_video_url: Optional[str] = None, avatar_image_url: Optional[str] = None, avatar_video_file: Optional[str] = None, avatar_image_file: Optional[str] = None, audio_url: Optional[str] = None, audio_file: Optional[str] = None, title: Optional[str] = None, async_mode: bool = False, output_path: str = DEFAULT_OUTPUT_PATH ): # API key validation if not api_key: return {"error": "API key not found. Please set FLYWORKS_API_TOKEN environment variable."} # Avatar handling avatar_created = False avatar_options = sum(1 for x in [avatar_video_url, avatar_image_url, avatar_video_file, avatar_image_file] if x) if not avatar and avatar_options == 0: return {"error": "Either avatar ID or avatar creation parameters must be provided"} if not avatar and avatar_options > 0: # Create avatar using provided parameters avatar_result = await create_avatar( video_url=avatar_video_url, image_url=avatar_image_url, video_file=avatar_video_file, image_file=avatar_image_file, title=f"Avatar for {title or 'Lipsync Video'}" ) if "error" in avatar_result: return avatar_result avatar = avatar_result.get("avatar") avatar_created = True if not avatar: return {"error": "Failed to get avatar ID after creation"} # Audio parameter validation if not audio_url and not audio_file: return {"error": "Either audio_url or audio_file must be provided"} # Prepare payload payload = { "avatar": avatar, "title": title or "Lipsync Video" } # Handle local audio file file_id = None if audio_file and not audio_url: upload_result = await upload_file(audio_file) if "error" in upload_result: return upload_result file_id = upload_result.get("file_id") payload["file_id"] = file_id elif audio_url: payload["audio_url"] = audio_url # Create the lipsync video try: async with httpx.AsyncClient() as client: response = await client.post( f"{api_base_url}/video/create_by_audio", headers=get_auth_headers(), json=payload, timeout=30 ) if response.status_code != 200: return { "error": f"API request failed with status {response.status_code}", "details": response.json() } response_data = response.json() task_id = response_data.get("task_id") if not task_id: return {"error": "No task_id in response", "details": response_data} # If async mode, just return the task ID if async_mode: result = { "task_id": task_id, "message": "Task created successfully. Use task_id to check completion using the /video/task API endpoint.", "async_mode": True } if avatar_created: result["created_avatar"] = avatar return result # If sync mode, wait for completion and download task_result = await wait_for_task_completion(task_id) if "error" in task_result: return task_result # Get video URL from task result video_url = task_result.get("video_Url") if not video_url: return {"error": "No video URL in task result", "details": task_result} # Download the video download_result = await download_video(video_url, output_path) if "error" in download_result: # 如果下载失败,仍然返回视频URL return { "error": download_result["error"], "video_url": video_url, "task_id": task_id, "task_result": task_result, "message": "Video download failed, but you can manually download it using the provided video_url" } # Return success response result = { "async_mode": False, "task_id": task_id, "task_result": task_result, "download_result": download_result } if avatar_created: result["created_avatar"] = avatar return result except httpx.RequestError as e: return {"error": f"Request error: {str(e)}"} except Exception as e: return {"error": f"Unexpected error: {str(e)}"} @mcp.tool( description=""" Create lipsync video by text input. The tool will generate audio from the text and animate a digital human avatar to speak it. Parameters: - avatar: Digital human avatar ID. Either this or avatar creation parameters must be provided. - avatar_video_url: URL of a video to create the avatar from. - avatar_image_url: URL of an image to create the avatar from. - avatar_video_file: Local path to a video file to create the avatar from. - avatar_image_file: Local path to an image file to create the avatar from. - text: Text content to be spoken by the avatar. Required. - voice: Voice ID to use for text-to-speech. If not provided, a random voice will be selected. - title: Optional title for the created video. - async_mode: If true, returns task_id immediately. If false, waits for completion and downloads the video. Default is false. - output_path: Where to save the downloaded video if async_mode is false. Default is "output.mp4". Avatar creation: Provide exactly ONE of avatar_video_url, avatar_image_url, avatar_video_file, or avatar_image_file to create a new avatar. If avatar ID is directly provided, these parameters will be ignored. Text should be between 1 and 500 characters. Returns: - If async_mode is true: task_id for checking status later, selected voice ID - If async_mode is false: downloaded video path, task result, and selected voice ID - If download fails: error message, task_id, task_result, video_url, and selected voice ID for manual download """ ) async def create_lipsync_video_by_text( avatar: Optional[str] = None, avatar_video_url: Optional[str] = None, avatar_image_url: Optional[str] = None, avatar_video_file: Optional[str] = None, avatar_image_file: Optional[str] = None, text: str = "", voice: Optional[str] = None, title: Optional[str] = None, async_mode: bool = False, output_path: str = DEFAULT_OUTPUT_PATH ): # API key validation if not api_key: return {"error": "API key not found. Please set FLYWORKS_API_TOKEN environment variable."} # Text parameter validation if not text: return {"error": "text parameter is required"} # Avatar handling avatar_created = False avatar_options = sum(1 for x in [avatar_video_url, avatar_image_url, avatar_video_file, avatar_image_file] if x) if not avatar and avatar_options == 0: return {"error": "Either avatar ID or avatar creation parameters must be provided"} if not avatar and avatar_options > 0: # Create avatar using provided parameters avatar_result = await create_avatar( video_url=avatar_video_url, image_url=avatar_image_url, video_file=avatar_video_file, image_file=avatar_image_file, title=f"Avatar for {title or 'Text-driven Lipsync Video'}" ) if "error" in avatar_result: return avatar_result avatar = avatar_result.get("avatar") avatar_created = True if not avatar: return {"error": "Failed to get avatar ID after creation"} # If voice is not provided, get a random voice from the voice list if not voice: # Randomly select a public voice (type=2) voice_list_result = await get_voice_list(voice_type=2) if "error" in voice_list_result: return voice_list_result voices = voice_list_result.get("data", []) if not voices: return {"error": "No public voices available"} # Randomly select a public voice random_voice = random.choice(voices) voice = random_voice.get("voice") print(f"Selected voice: {voice}") if not voice: return {"error": "Failed to get a valid voice ID"} # Prepare payload payload = { "avatar": avatar, "text": text, "voice": voice, "title": title or "Text-driven Lipsync Video" } # Create the lipsync video try: async with httpx.AsyncClient() as client: response = await client.post( f"{api_base_url}/video/create_by_tts", headers=get_auth_headers(), json=payload, timeout=30 ) if response.status_code != 200: return { "error": f"API request failed with status {response.status_code}", "details": response.json() } response_data = response.json() task_id = response_data.get("task_id") if not task_id: return {"error": "No task_id in response", "details": response_data} # If async mode, just return the task ID if async_mode: result = { "task_id": task_id, "message": "Task created successfully. Use task_id to check completion using the /video/task API endpoint.", "async_mode": True, "selected_voice": voice } if avatar_created: result["created_avatar"] = avatar return result # If sync mode, wait for completion and download task_result = await wait_for_task_completion(task_id) if "error" in task_result: return task_result # Get video URL from task result video_url = task_result.get("video_Url") if not video_url: return {"error": "No video URL in task result", "details": task_result} # Download the video download_result = await download_video(video_url, output_path) if "error" in download_result: # 如果下载失败,仍然返回视频URL return { "error": download_result["error"], "video_url": video_url, "task_id": task_id, "task_result": task_result, "selected_voice": voice, "message": "Video download failed, but you can manually download it using the provided video_url" } # Return success response result = { "async_mode": False, "task_id": task_id, "task_result": task_result, "download_result": download_result, "selected_voice": voice } if avatar_created: result["created_avatar"] = avatar return result except httpx.RequestError as e: return {"error": f"Request error: {str(e)}"} except Exception as e: return {"error": f"Unexpected error: {str(e)}"} def main(): """Start the Flyworks MCP server""" if not api_key: print("Warning: FLYWORKS_API_TOKEN environment variable not set.", file=os.sys.stderr) print("Set it to use the Flyworks API properly.", file=os.sys.stderr) if "FLYWORKS_API_BASE_URL" not in os.environ: print(f"Note: Using default API base URL: {api_base_url}", file=os.sys.stderr) print("Starting Flyworks MCP server...", file=os.sys.stderr) mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Flyworks-AI/flyworks-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server