Skip to main content
Glama

Tabcorp API MCP Server

by bencousins22
server.py.backup10.5 kB
""" Smithery MCP server: Tabcorp OAuth (password grant + refresh) and querying tools - Configure session with your credentials if desired (client_id/secret, username, password, jurisdiction) - Use tools to obtain access/refresh tokens and make API requests """ from __future__ import annotations import time from typing import Any, Dict, Optional import httpx from mcp.server.fastmcp import Context, FastMCP from pydantic import BaseModel, Field from smithery.decorators import smithery TAB_BASE_URL_DEFAULT = "https://api.beta.tab.com.au" OAUTH_TOKEN_PATH = "/oauth/token" USER_AGENT = "tab-mcp/0.1 (+smithery)" class ConfigSchema(BaseModel): client_id: Optional[str] = Field(None, description="Tabcorp OAuth client_id") client_secret: Optional[str] = Field(None, description="Tabcorp OAuth client_secret") username: Optional[str] = Field(None, description="TAB account number (for password grant)") password: Optional[str] = Field(None, description="TAB account password (for password grant)") refresh_token: Optional[str] = Field(None, description="Optional cached refresh token") jurisdiction: str = Field("NSW", description="Jurisdiction code, e.g., NSW, VIC") base_url: str = Field(TAB_BASE_URL_DEFAULT, description="Tabcorp API base URL") auto_refresh: bool = Field(True, description="Whether client should refresh tokens when close to expiry") @smithery.server(config_schema=ConfigSchema) def create_server() -> FastMCP: server = FastMCP("Tabcorp API Server") def _oauth_post(base_url: str, data: Dict[str, str]) -> Dict[str, Any]: url = base_url.rstrip("/") + OAUTH_TOKEN_PATH headers = {"content-type": "application/x-www-form-urlencoded", "user-agent": USER_AGENT} with httpx.Client(timeout=30.0) as client: resp = client.post(url, data=data, headers=headers) try: resp.raise_for_status() except httpx.HTTPStatusError as e: # Attempt to parse JSON error err: Dict[str, Any] try: err = resp.json() except Exception: err = {"error": resp.text} raise ValueError( f"OAuth request failed with status {resp.status_code}: {err}" ) from e try: return resp.json() except Exception as e: raise ValueError("OAuth response is not valid JSON") from e def _bearer_get(base_url: str, path: str, token: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: url = base_url.rstrip("/") + (path if path.startswith("/") else f"/{path}") headers = {"authorization": f"Bearer {token}", "user-agent": USER_AGENT} with httpx.Client(timeout=30.0) as client: resp = client.get(url, headers=headers, params=params or {}) try: resp.raise_for_status() except httpx.HTTPStatusError as e: try: err = resp.json() except Exception: err = {"error": resp.text} raise ValueError( f"GET {path} failed with status {resp.status_code}: {err}" ) from e try: return resp.json() except Exception as e: raise ValueError("GET response is not valid JSON") from e def _bearer_post(base_url: str, path: str, token: str, json_body: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: url = base_url.rstrip("/") + (path if path.startswith("/") else f"/{path}") headers = {"authorization": f"Bearer {token}", "user-agent": USER_AGENT, "content-type": "application/json"} with httpx.Client(timeout=30.0) as client: resp = client.post(url, headers=headers, json=json_body or {}) try: resp.raise_for_status() except httpx.HTTPStatusError as e: try: err = resp.json() except Exception: err = {"error": resp.text} raise ValueError( f"POST {path} failed with status {resp.status_code}: {err}" ) from e try: return resp.json() except Exception as e: raise ValueError("POST response is not valid JSON") from e # Tools @server.tool() def tab_oauth_password_grant( ctx: Context, client_id: Optional[str] = None, client_secret: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, base_url: Optional[str] = None, ) -> Dict[str, Any]: """Obtain bearer access_token and refresh_token via password grant. Pulls missing values from session config when not provided explicitly. """ cfg: ConfigSchema = ctx.session_config cid = client_id or cfg.client_id csec = client_secret or cfg.client_secret user = username or cfg.username pwd = password or cfg.password burl = base_url or cfg.base_url or TAB_BASE_URL_DEFAULT if not all([cid, csec, user, pwd]): missing = [name for name, val in [ ("client_id", cid), ("client_secret", csec), ("username", user), ("password", pwd) ] if not val] raise ValueError(f"Missing required fields for password grant: {', '.join(missing)}") data = { "grant_type": "password", "client_id": cid, "client_secret": csec, "username": user, "password": pwd, } out = _oauth_post(burl, data) # Add expires_at convenience timestamp (buffer 60s) expires_in = int(out.get("expires_in", 0) or 0) out["expires_at"] = int(time.time()) + max(0, expires_in - 60) out["base_url"] = burl return out @server.tool() def tab_oauth_refresh( ctx: Context, refresh_token: Optional[str] = None, client_id: Optional[str] = None, client_secret: Optional[str] = None, base_url: Optional[str] = None, ) -> Dict[str, Any]: """Refresh access token using refresh_token flow. Missing values are sourced from session config.""" cfg: ConfigSchema = ctx.session_config rtok = refresh_token or cfg.refresh_token cid = client_id or cfg.client_id csec = client_secret or cfg.client_secret burl = base_url or cfg.base_url or TAB_BASE_URL_DEFAULT if not all([cid, csec, rtok]): missing = [name for name, val in [("client_id", cid), ("client_secret", csec), ("refresh_token", rtok)] if not val] raise ValueError(f"Missing required fields for refresh_token grant: {', '.join(missing)}") data = { "grant_type": "refresh_token", "client_id": cid, "client_secret": csec, "refresh_token": rtok, } out = _oauth_post(burl, data) expires_in = int(out.get("expires_in", 0) or 0) out["expires_at"] = int(time.time()) + max(0, expires_in - 60) out["base_url"] = burl return out @server.tool() def tab_oauth_client_credentials( ctx: Context, client_id: Optional[str] = None, client_secret: Optional[str] = None, base_url: Optional[str] = None, ) -> Dict[str, Any]: """Obtain token using client_credentials grant (for non-account queries).""" cfg: ConfigSchema = ctx.session_config cid = client_id or cfg.client_id csec = client_secret or cfg.client_secret burl = base_url or cfg.base_url or TAB_BASE_URL_DEFAULT if not all([cid, csec]): missing = [name for name, val in [("client_id", cid), ("client_secret", csec)] if not val] raise ValueError(f"Missing required fields for client_credentials: {', '.join(missing)}") data = { "grant_type": "client_credentials", "client_id": cid, "client_secret": csec, } out = _oauth_post(burl, data) expires_in = int(out.get("expires_in", 0) or 0) out["expires_at"] = int(time.time()) + max(0, expires_in - 60) out["base_url"] = burl return out @server.tool() def tab_get( ctx: Context, access_token: str, path: str, params: Optional[Dict[str, Any]] = None, jurisdiction: Optional[str] = None, base_url: Optional[str] = None, ) -> Dict[str, Any]: """Generic GET helper with Bearer token. path: e.g., "/v1/tab-info-service/sports" or "v1/tab-info-service/sports" params: dict of query parameters; jurisdiction defaults from session config if not provided """ cfg: ConfigSchema = ctx.session_config burl = base_url or cfg.base_url or TAB_BASE_URL_DEFAULT j = jurisdiction or cfg.jurisdiction or "NSW" p = dict(params or {}) # include jurisdiction if not explicitly provided if "jurisdiction" not in p: p["jurisdiction"] = j return _bearer_get(burl, path, access_token, p) @server.tool() def tab_post( ctx: Context, access_token: str, path: str, body: Optional[Dict[str, Any]] = None, base_url: Optional[str] = None, ) -> Dict[str, Any]: """Generic POST helper with Bearer token for endpoints like placing bets.""" cfg: ConfigSchema = ctx.session_config burl = base_url or cfg.base_url or TAB_BASE_URL_DEFAULT return _bearer_post(burl, path, access_token, body or {}) @server.tool() def tab_list_sports( ctx: Context, access_token: str, jurisdiction: Optional[str] = None, base_url: Optional[str] = None, ) -> Dict[str, Any]: """Convenience wrapper for GET /v1/tab-info-service/sports?jurisdiction=...""" cfg: ConfigSchema = ctx.session_config burl = base_url or cfg.base_url or TAB_BASE_URL_DEFAULT j = jurisdiction or cfg.jurisdiction or "NSW" return _bearer_get(burl, "/v1/tab-info-service/sports", access_token, {"jurisdiction": j}) # Keep a simple hello tool for sanity checks @server.tool() def hello(ctx: Context, name: str) -> str: cfg: ConfigSchema = ctx.session_config return f"Hello, {name}! (jurisdiction={cfg.jurisdiction})" return server

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bencousins22/tab-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server