Skip to main content
Glama
gabguerin

Hooktheory MCP Server

by gabguerin

get_chord_transitions

Analyze chord progression patterns and transition probabilities using Hooktheory's music theory database. Retrieve chord frequency statistics or discover likely next chords in musical compositions.

Instructions

Get chord statistics and transition probabilities from Hooktheory database. Args: cp: Optional chord progression to get transitions from (e.g., "4" for chords after IV, "4,1" for chords after IV-I) If not provided, returns overall chord frequency statistics key: Musical key filter (e.g., "C", "Am") mode: Scale mode filter ("major" or "minor") Returns: JSON string containing chord nodes with chord_ID, chord_HTML (Roman numeral), probability, and child_path - Without cp: Shows overall chord frequencies (I=18.9%, IV=17.2%, etc.) - With cp: Shows what chords follow the progression (e.g., after IV: I=32.4%, V=28.9%)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
cpNo
keyNo
modeNo

Implementation Reference

  • Main handler function for get_chord_transitions tool. Takes optional chord progression (cp), key, and mode parameters, constructs API params, calls the Hooktheory API 'nodes' endpoint, and returns chord statistics/transition probabilities as JSON string.
    async def get_chord_transitions( cp: Optional[str] = None, key: Optional[str] = None, mode: Optional[str] = None, ) -> str: """ Get chord statistics and transition probabilities from Hooktheory database. Args: cp: Optional chord progression to get transitions from (e.g., "4" for chords after IV, "4,1" for chords after IV-I) If not provided, returns overall chord frequency statistics key: Musical key filter (e.g., "C", "Am") mode: Scale mode filter ("major" or "minor") Returns: JSON string containing chord nodes with chord_ID, chord_HTML (Roman numeral), probability, and child_path - Without cp: Shows overall chord frequencies (I=18.9%, IV=17.2%, etc.) - With cp: Shows what chords follow the progression (e.g., after IV: I=32.4%, V=28.9%) """ try: params: Dict[str, Any] = {} if cp: params["cp"] = cp if key: params["key"] = key if mode: params["mode"] = mode result = await hooktheory_client._make_request("nodes", params) return str(result) except Exception as e: error_msg = f"Error fetching chord transitions{' for ' + cp if cp else ''}: {str(e)}" logger.error(error_msg) return error_msg
  • Tool registration decorator that registers get_chord_transitions with the MCP server via FastMCP's @mcp.tool() decorator.
    @mcp.tool()
  • The _make_request helper method in HooktheoryClient that handles authentication, rate limiting, HTTP requests with retry logic, and error handling - used by get_chord_transitions to call the Hooktheory API.
    async def _make_request( self, endpoint: str, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Make an authenticated request to the Hooktheory API with rate limiting.""" # Ensure we have a valid token await self._ensure_authenticated() # Apply rate limiting await self.rate_limiter.wait() headers = { "Authorization": f"Bearer {self.access_token}", "User-Agent": "Hooktheory-MCP-Server/0.2.2", "Content-Type": "application/json", } url = f"{self.trends_base_url}/{endpoint}" async with httpx.AsyncClient(timeout=30.0) as client: try: logger.debug(f"Making request to {url} with params: {params}") response = await client.get(url, headers=headers, params=params or {}) if response.status_code == 429: # Rate limited - apply backoff self.rate_limiter.on_failure() retry_after = int(response.headers.get("Retry-After", "60")) logger.warning( f"Rate limited. Waiting {retry_after} seconds before retry" ) await asyncio.sleep(retry_after) # Retry once after rate limit await self.rate_limiter.wait() response = await client.get( url, headers=headers, params=params or {} ) if response.status_code == 401: # Token might be expired, force re-authentication logger.info("Received 401, forcing re-authentication") self.access_token = None self.token_expires_at = None await self._ensure_authenticated() # Update headers with new token headers["Authorization"] = f"Bearer {self.access_token}" # Retry with new token await self.rate_limiter.wait() response = await client.get( url, headers=headers, params=params or {} ) response.raise_for_status() self.rate_limiter.on_success() result = response.json() logger.debug( f"Request successful: {len(str(result))} characters returned" ) return result except httpx.HTTPStatusError as e: self.rate_limiter.on_failure() logger.error(f"HTTP {e.response.status_code} error calling {url}: {e}") raise except httpx.RequestError as e: self.rate_limiter.on_failure() logger.error(f"Request error calling {url}: {e}") raise except Exception as e: self.rate_limiter.on_failure() logger.error(f"Unexpected error calling {url}: {e}") raise
  • HooktheoryClient class initialization that sets up the API base URLs, retrieves credentials from environment variables, and initializes the rate limiter used by get_chord_transitions.
    class HooktheoryClient: """Client for interacting with the Hooktheory API with OAuth 2 authentication.""" def __init__(self, base_url: str = "https://api.hooktheory.com/v1"): self.base_url = base_url self.trends_base_url = f"{base_url}/trends" self.username = os.getenv("HOOKTHEORY_USERNAME") self.password = os.getenv("HOOKTHEORY_PASSWORD") # Token management self.access_token: Optional[str] = None self.user_id: Optional[int] = None self.token_expires_at: Optional[float] = None # Rate limiting self.rate_limiter = RateLimiter( max_requests_per_second=1.5 ) # Conservative rate
  • RateLimiter helper class that implements exponential backoff for API rate limiting, used by HooktheoryClient to respect Hooktheory API rate limits during get_chord_transitions calls.
    class RateLimiter: """Simple rate limiter with exponential backoff.""" def __init__(self, max_requests_per_second: float = 2.0): self.max_requests_per_second = max_requests_per_second self.min_interval = 1.0 / max_requests_per_second self.last_request_time = 0.0 self.backoff_delay = 0.0 self.consecutive_failures = 0 async def wait(self): """Wait if necessary to respect rate limits.""" current_time = time.time() time_since_last = current_time - self.last_request_time # Apply backoff delay if we have consecutive failures total_delay = max(self.min_interval - time_since_last, self.backoff_delay) if total_delay > 0: await asyncio.sleep(total_delay) self.last_request_time = time.time() def on_success(self): """Reset backoff on successful request.""" self.consecutive_failures = 0 self.backoff_delay = 0.0 def on_failure(self): """Increase backoff delay on failed request.""" self.consecutive_failures += 1 # Exponential backoff: 1s, 2s, 4s, 8s, max 60s self.backoff_delay = min(60.0, 2 ** (self.consecutive_failures - 1))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gabguerin/hooktheory-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server