Skip to main content
Glama
api_client.py6.07 kB
from os import getenv, makedirs from pathlib import Path from sys import argv from typing import Any from aiocache import cached from ygg_torrent import ygg_api from .models import Cache, Torrent from .scraper import WEBSITES, search_torrents PREFER_TORRENT_FILES: bool = str(getenv("PREFER_TORRENT_FILES")).lower() == "true" FOLDER_TORRENT_FILES: Path = Path(getenv("FOLDER_TORRENT_FILES") or "./torrents") makedirs(FOLDER_TORRENT_FILES, exist_ok=True) EXCLUDE_SOURCES: set[str] = set() SOURCES: list[str] = ["yggtorrent"] + list(WEBSITES.keys()) YGG_ENABLED: bool = len(str(getenv("YGG_PASSKEY"))) == 32 if not YGG_ENABLED: EXCLUDE_SOURCES.add("yggtorrent") if excluded_sources := getenv("EXCLUDE_SOURCES"): EXCLUDE_SOURCES = EXCLUDE_SOURCES.union( {source.lower().strip() for source in excluded_sources.split(",")} ) SOURCES = [source for source in SOURCES if source not in EXCLUDE_SOURCES] def key_builder( _namespace: str, _fn: Any, *args: tuple[Any], **kwargs: dict[str, Any] ) -> str: key = { "query": args[0] if len(args) > 0 else "", "max_items": args[1] if len(args) > 1 else 10, } | kwargs return str(key) class TorrentSearchApi: """A client for searching torrents on ThePirateBay, Nyaa and YGG Torrent.""" CACHE: Cache = Cache() def available_sources(self) -> list[str]: """Get the list of available torrent sources.""" return SOURCES @cached(ttl=300, key_builder=key_builder) # type: ignore[untyped-decorator] # 5min async def search_torrents( self, query: str, max_items: int = 10, ) -> list[Torrent]: """ Search for torrents on ThePirateBay, Nyaa and YGG Torrent. Args: query: Search query. max_items: Maximum number of items to return. Returns: A list of torrent results. """ found_torrents: list[Torrent] = [] if any(source != "yggtorrent" for source in SOURCES): found_torrents.extend(await search_torrents(query, SOURCES)) if "yggtorrent" in SOURCES: found_torrents.extend( [ Torrent.format(**torrent.model_dump(), source="yggtorrent") for torrent in ygg_api.search_torrents(query) ] ) found_torrents = list( sorted( found_torrents, key=lambda torrent: torrent.seeders + torrent.leechers, reverse=True, ) )[:max_items] for torrent in found_torrents: torrent.prepend_info(query, max_items) self.CACHE.clean() # Clean cache routine self.CACHE.update(found_torrents) return found_torrents async def get_torrent_details(self, torrent_id: str) -> Torrent | None: """ Get details about a previously found torrent. Args: torrent_id: The ID of the torrent. Returns: Detailed torrent result or None. """ found_torrent: Torrent | None = self.CACHE.get(torrent_id) try: query, max_items, source, ref_id = Torrent.extract_info(torrent_id) except Exception: print(f"Invalid torrent ID: {torrent_id}") return None if source == "yggtorrent": if not found_torrent: # Missing or uncached ygg_torrent = ygg_api.get_torrent_details( int(ref_id), with_magnet_link=not PREFER_TORRENT_FILES ) if ygg_torrent: found_torrent = Torrent.format( **ygg_torrent.model_dump(), source="yggtorrent" ) found_torrent.prepend_info(query, max_items) if found_torrent: if PREFER_TORRENT_FILES: if not found_torrent.torrent_file: filename = ygg_api.download_torrent_file( int(ref_id), output_dir=FOLDER_TORRENT_FILES ) if filename: found_torrent.torrent_file = str( FOLDER_TORRENT_FILES / filename ) elif not found_torrent.magnet_link: # Cached but missing magnet link found_torrent.magnet_link = ygg_api.get_magnet_link(int(ref_id)) elif not found_torrent: # Missing or uncached torrents: list[Torrent] = await self.search_torrents(query, max_items) found_torrent = next( (torrent for torrent in torrents if torrent.id == torrent_id), None ) self.CACHE.clean() # Clean cache routine return found_torrent async def get_magnet_link_or_torrent_file(self, torrent_id: str) -> str | None: """ Get the magnet link or torrent filepath for a previously found torrent. Args: torrent_id: The ID of the torrent. Returns: The magnet link or torrent filepath as a string, else None. """ found_torrent: Torrent | None = await self.get_torrent_details(torrent_id) if found_torrent: if PREFER_TORRENT_FILES and found_torrent.torrent_file: return found_torrent.torrent_file elif found_torrent.magnet_link: return found_torrent.magnet_link return None if __name__ == "__main__": async def main() -> None: query = argv[1] if len(argv) > 1 else None if not query: print("Please provide a search query.") exit(1) client = TorrentSearchApi() torrents: list[Torrent] = await client.search_torrents(query, max_items=5) if torrents: for torrent in torrents: print(await client.get_magnet_link_or_torrent_file(torrent.id)) else: print("No torrents found") from asyncio import run run(main())

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/philogicae/torrent-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server