Skip to main content
Glama
gabguerin

Hooktheory MCP Server

by gabguerin

get_chord_transitions

Analyze chord progression patterns and transition probabilities using Hooktheory's music theory database. Retrieve chord frequency statistics or discover likely next chords in musical compositions.

Instructions

Get chord statistics and transition probabilities from Hooktheory database.

Args:
    cp: Optional chord progression to get transitions from (e.g., "4" for chords after IV, "4,1" for chords after IV-I)
        If not provided, returns overall chord frequency statistics
    key: Musical key filter (e.g., "C", "Am")
    mode: Scale mode filter ("major" or "minor")

Returns:
    JSON string containing chord nodes with chord_ID, chord_HTML (Roman numeral), probability, and child_path
    - Without cp: Shows overall chord frequencies (I=18.9%, IV=17.2%, etc.)
    - With cp: Shows what chords follow the progression (e.g., after IV: I=32.4%, V=28.9%)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
cpNo
keyNo
modeNo

Implementation Reference

  • Main handler function for get_chord_transitions tool. Takes optional chord progression (cp), key, and mode parameters, constructs API params, calls the Hooktheory API 'nodes' endpoint, and returns chord statistics/transition probabilities as JSON string.
    async def get_chord_transitions(
        cp: Optional[str] = None,
        key: Optional[str] = None,
        mode: Optional[str] = None,
    ) -> str:
        """
        Get chord statistics and transition probabilities from Hooktheory database.
    
        Args:
            cp: Optional chord progression to get transitions from (e.g., "4" for chords after IV, "4,1" for chords after IV-I)
                If not provided, returns overall chord frequency statistics
            key: Musical key filter (e.g., "C", "Am")
            mode: Scale mode filter ("major" or "minor")
    
        Returns:
            JSON string containing chord nodes with chord_ID, chord_HTML (Roman numeral), probability, and child_path
            - Without cp: Shows overall chord frequencies (I=18.9%, IV=17.2%, etc.)
            - With cp: Shows what chords follow the progression (e.g., after IV: I=32.4%, V=28.9%)
        """
        try:
            params: Dict[str, Any] = {}
            if cp:
                params["cp"] = cp
            if key:
                params["key"] = key
            if mode:
                params["mode"] = mode
    
            result = await hooktheory_client._make_request("nodes", params)
            return str(result)
    
        except Exception as e:
            error_msg = f"Error fetching chord transitions{' for ' + cp if cp else ''}: {str(e)}"
            logger.error(error_msg)
            return error_msg
  • Tool registration decorator that registers get_chord_transitions with the MCP server via FastMCP's @mcp.tool() decorator.
    @mcp.tool()
  • The _make_request helper method in HooktheoryClient that handles authentication, rate limiting, HTTP requests with retry logic, and error handling - used by get_chord_transitions to call the Hooktheory API.
    async def _make_request(
        self, endpoint: str, params: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Make an authenticated request to the Hooktheory API with rate limiting."""
    
        # Ensure we have a valid token
        await self._ensure_authenticated()
    
        # Apply rate limiting
        await self.rate_limiter.wait()
    
        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "User-Agent": "Hooktheory-MCP-Server/0.2.2",
            "Content-Type": "application/json",
        }
    
        url = f"{self.trends_base_url}/{endpoint}"
    
        async with httpx.AsyncClient(timeout=30.0) as client:
            try:
                logger.debug(f"Making request to {url} with params: {params}")
                response = await client.get(url, headers=headers, params=params or {})
    
                if response.status_code == 429:
                    # Rate limited - apply backoff
                    self.rate_limiter.on_failure()
                    retry_after = int(response.headers.get("Retry-After", "60"))
                    logger.warning(
                        f"Rate limited. Waiting {retry_after} seconds before retry"
                    )
                    await asyncio.sleep(retry_after)
    
                    # Retry once after rate limit
                    await self.rate_limiter.wait()
                    response = await client.get(
                        url, headers=headers, params=params or {}
                    )
    
                if response.status_code == 401:
                    # Token might be expired, force re-authentication
                    logger.info("Received 401, forcing re-authentication")
                    self.access_token = None
                    self.token_expires_at = None
                    await self._ensure_authenticated()
    
                    # Update headers with new token
                    headers["Authorization"] = f"Bearer {self.access_token}"
    
                    # Retry with new token
                    await self.rate_limiter.wait()
                    response = await client.get(
                        url, headers=headers, params=params or {}
                    )
    
                response.raise_for_status()
                self.rate_limiter.on_success()
    
                result = response.json()
                logger.debug(
                    f"Request successful: {len(str(result))} characters returned"
                )
                return result
    
            except httpx.HTTPStatusError as e:
                self.rate_limiter.on_failure()
                logger.error(f"HTTP {e.response.status_code} error calling {url}: {e}")
                raise
            except httpx.RequestError as e:
                self.rate_limiter.on_failure()
                logger.error(f"Request error calling {url}: {e}")
                raise
            except Exception as e:
                self.rate_limiter.on_failure()
                logger.error(f"Unexpected error calling {url}: {e}")
                raise
  • HooktheoryClient class initialization that sets up the API base URLs, retrieves credentials from environment variables, and initializes the rate limiter used by get_chord_transitions.
    class HooktheoryClient:
        """Client for interacting with the Hooktheory API with OAuth 2 authentication."""
    
        def __init__(self, base_url: str = "https://api.hooktheory.com/v1"):
            self.base_url = base_url
            self.trends_base_url = f"{base_url}/trends"
            self.username = os.getenv("HOOKTHEORY_USERNAME")
            self.password = os.getenv("HOOKTHEORY_PASSWORD")
    
            # Token management
            self.access_token: Optional[str] = None
            self.user_id: Optional[int] = None
            self.token_expires_at: Optional[float] = None
    
            # Rate limiting
            self.rate_limiter = RateLimiter(
                max_requests_per_second=1.5
            )  # Conservative rate
  • RateLimiter helper class that implements exponential backoff for API rate limiting, used by HooktheoryClient to respect Hooktheory API rate limits during get_chord_transitions calls.
    class RateLimiter:
        """Simple rate limiter with exponential backoff."""
    
        def __init__(self, max_requests_per_second: float = 2.0):
            self.max_requests_per_second = max_requests_per_second
            self.min_interval = 1.0 / max_requests_per_second
            self.last_request_time = 0.0
            self.backoff_delay = 0.0
            self.consecutive_failures = 0
    
        async def wait(self):
            """Wait if necessary to respect rate limits."""
            current_time = time.time()
            time_since_last = current_time - self.last_request_time
    
            # Apply backoff delay if we have consecutive failures
            total_delay = max(self.min_interval - time_since_last, self.backoff_delay)
    
            if total_delay > 0:
                await asyncio.sleep(total_delay)
    
            self.last_request_time = time.time()
    
        def on_success(self):
            """Reset backoff on successful request."""
            self.consecutive_failures = 0
            self.backoff_delay = 0.0
    
        def on_failure(self):
            """Increase backoff delay on failed request."""
            self.consecutive_failures += 1
            # Exponential backoff: 1s, 2s, 4s, 8s, max 60s
            self.backoff_delay = min(60.0, 2 ** (self.consecutive_failures - 1))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gabguerin/hooktheory-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server