We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/philogicae/fr-torrent-search-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import shutil
import tempfile
from pathlib import Path as PathLibPath
from fastapi import FastAPI, HTTPException, Path
from fastapi.responses import FileResponse
from starlette.background import BackgroundTask
from .wrapper import FrTorrentApi, Torrent
app = FastAPI(
title="Fr Torrent Search FastAPI",
description="FastAPI server for searching torrents across multiple providers.",
)
client = FrTorrentApi()
client.ensure_initialized()
def cleanup_temp_dir(dir_path: str) -> None:
"""Safely removes a directory and its contents."""
try:
shutil.rmtree(dir_path)
except OSError as e:
print(f"Error removing temporary directory {dir_path}: {e}")
# --- API Endpoints ---
@app.get("/", summary="Health Check", tags=["General"], response_model=dict[str, str])
async def health_check() -> dict[str, str]:
"""
Endpoint to check the health of the service.
"""
return {"status": "ok"}
@app.post(
"/torrent/search",
summary="Search Torrents",
tags=["Torrents"],
response_model=list[Torrent],
)
async def search_torrents(
query: str,
max_items: int = 10,
) -> list[Torrent]:
"""
Search for torrents across multiple providers.
Corresponds to `FrTorrentApi.search_torrents()`.
"""
return client.search_torrents(query=query, max_items=max_items)
@app.get(
"/torrent/{torrent_id}",
summary="Get Torrent",
tags=["Torrents"],
)
async def get_torrent(
torrent_id: str = Path(..., description="The ID of the torrent."),
):
"""
Get a specific torrent (either magnet link, torrent file content, or torrent file path).
Corresponds to `FrTorrentApi.get_torrent()`.
"""
temp_dir_path = tempfile.mkdtemp()
try:
result = client.get_torrent(torrent_id, output_dir=temp_dir_path)
if not result:
cleanup_temp_dir(temp_dir_path)
raise HTTPException(
status_code=404, detail="Torrent not found or could not be retrieved."
)
# Handle bytes (torrent file content)
if isinstance(result, bytes):
filename = f"{torrent_id}.torrent"
full_file_path = PathLibPath(temp_dir_path) / filename
with open(full_file_path, "wb") as f:
f.write(result)
return FileResponse(
path=str(full_file_path),
media_type="application/x-bittorrent",
filename=filename,
background=BackgroundTask(cleanup_temp_dir, temp_dir_path),
)
# Handle string (magnet link or filename)
if isinstance(result, str):
if result.startswith("magnet:"):
cleanup_temp_dir(temp_dir_path)
return result
# Check if it's a filename or path
file_path = PathLibPath(result)
if not file_path.is_absolute():
file_path = PathLibPath(temp_dir_path) / result
if file_path.is_file():
return FileResponse(
path=str(file_path),
media_type="application/x-bittorrent",
filename=file_path.name,
background=BackgroundTask(cleanup_temp_dir, temp_dir_path),
)
# If it's a string but not magnet/file, just return it
cleanup_temp_dir(temp_dir_path)
return result
# Fallback for any other successful result
cleanup_temp_dir(temp_dir_path)
return result
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
cleanup_temp_dir(temp_dir_path)
raise HTTPException(
status_code=500,
detail=f"An error occurred while retrieving the torrent: {str(e)}",
) from e
@app.get(
"/torrent/{torrent_id}/magnet",
summary="Get Magnet Link",
tags=["Torrents"],
response_model=str,
)
async def get_magnet_link(
torrent_id: str = Path(..., description="The ID of the torrent."),
) -> str:
"""
Get the magnet link for a specific torrent.
Corresponds to `FrTorrentApi.get_magnet_link()`.
"""
magnet_link = client.get_magnet_link(torrent_id)
if not magnet_link:
raise HTTPException(
status_code=404, detail="Magnet link not found or could not be generated."
)
return magnet_link
@app.get(
"/torrent/{torrent_id}/file",
summary="Download .torrent File",
tags=["Torrents"],
response_class=FileResponse,
)
async def download_torrent_file(
torrent_id: str = Path(..., description="The ID of the torrent."),
) -> FileResponse:
"""
Download the .torrent file for a specific torrent.
Corresponds to `FrTorrentApi.download_torrent_file()`.
The file is downloaded to a temporary location on the server and then streamed.
The temporary file is cleaned up afterwards.
"""
temp_dir_path = None
try:
temp_dir_path = tempfile.mkdtemp()
downloaded_filename = client.download_torrent_file(
torrent_id=torrent_id, output_dir=temp_dir_path
)
if not downloaded_filename:
if temp_dir_path:
cleanup_temp_dir(temp_dir_path)
raise HTTPException(
status_code=404, detail="Torrent file not found or download failed."
)
full_file_path = PathLibPath(temp_dir_path) / downloaded_filename
if not full_file_path.is_file():
if temp_dir_path:
cleanup_temp_dir(temp_dir_path)
raise HTTPException(
status_code=500,
detail="Torrent file was not saved correctly on server.",
)
return FileResponse(
path=str(full_file_path),
media_type="application/x-bittorrent",
filename=downloaded_filename,
background=BackgroundTask(cleanup_temp_dir, temp_dir_path),
)
except Exception as e:
if temp_dir_path:
cleanup_temp_dir(temp_dir_path)
raise HTTPException(
status_code=500,
detail=f"An error occurred while processing the torrent download: {str(e)}",
) from e