markdown2pdf-mcp
by 2b3pro
- src
- mcp_tinybird
import httpx
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime
from functools import wraps
import traceback
from pathlib import Path
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("TinybirdClient")
def log_function_call(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = datetime.now()
function_name = func.__name__
# Log the function call
logger.info(f"Calling {function_name} with args: {args[1:]} kwargs: {kwargs}")
try:
result = await func(*args, **kwargs)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"Successfully completed {function_name} in {duration:.2f}s")
return result
except Exception as e:
duration = (datetime.now() - start_time).total_seconds()
logger.error(
f"Exception in {function_name} after {duration:.2f}s: {str(e)}\n"
f"Traceback:\n{traceback.format_exc()}"
)
raise
return wrapper
@dataclass
class Column:
name: str
type: str
codec: Optional[str]
default_value: Optional[str]
jsonpath: Optional[str]
nullable: bool
normalized_name: str
@dataclass
class Engine:
engine: str
engine_sorting_key: str
engine_partition_key: str
engine_primary_key: Optional[str]
@dataclass
class DataSource:
id: str
name: str
engine: Engine
columns: List[Column]
indexes: List[Any]
new_columns_detected: Dict[str, Any]
quarantine_rows: int
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "DataSource":
engine = Engine(**data["engine"])
columns = [Column(**col) for col in data["columns"]]
return cls(
id=data["id"],
name=data["name"],
engine=engine,
columns=columns,
indexes=data["indexes"],
new_columns_detected=data["new_columns_detected"],
quarantine_rows=data["quarantine_rows"],
)
@dataclass
class Pipe:
type: str
id: str
name: str
description: Optional[str]
endpoint: Optional[str]
url: str
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Pipe":
return cls(**data)
@dataclass
class PipeData:
meta: List[Dict[str, str]]
data: List[Dict[str, Any]]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "PipeData":
return cls(**data)
class APIClient:
def __init__(self, api_url: str, token: str):
self.api_url = api_url.rstrip("/")
self.token = token
self.client = httpx.AsyncClient(
timeout=30.0,
headers={"Accept": "application/json", "User-Agent": "Python/APIClient"},
)
self.insights: list[str] = []
def _synthesize_memo(self) -> str:
if not self.insights:
return "No insights have been discovered yet."
insights = "\n".join(f"- {insight}" for insight in self.insights)
memo = "📊 Analysis Memo 📊\n\n"
memo += "Key Insights Discovered:\n\n"
memo += insights
if len(self.insights) > 1:
memo += "\nSummary:\n"
memo += f"Analysis has revealed {len(self.insights)} key insights."
return memo
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def close(self):
"""Close the underlying HTTP client."""
await self.client.aclose()
@log_function_call
async def _get(
self, endpoint: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
if params is None:
params = {}
params["token"] = self.token
params["__tb__client"] = "mcp-tinybird"
url = f"{self.api_url}/{endpoint}"
response = await self.client.get(url, params=params)
try:
response.raise_for_status()
except Exception as e:
logger.error(f"Error in _get: {e}")
raise Exception(response.json().get("error", str(e))) from e
return response.json()
@log_function_call
async def _post(
self, endpoint: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
if params is None:
params = {}
params["token"] = self.token
params["__tb__client"] = "mcp-tinybird"
url = f"{self.api_url}/{endpoint}"
response = await self.client.get(url, params=params)
response.raise_for_status()
return response.json()
async def list_data_sources(self) -> List[DataSource]:
"""List all available data sources."""
params = {"attrs": "id,name,description,columns"}
response = await self._get("v0/datasources", params)
return [DataSource.from_dict(ds) for ds in response["datasources"]]
async def get_data_source(self, datasource_id: str) -> Dict[str, Any]:
"""Get detailed information about a specific data source."""
params = {
"attrs": "columns",
}
return await self._get(f"v0/datasources/{datasource_id}", params)
async def list_pipes(self) -> List[Pipe]:
"""List all available pipes."""
params = {"attrs": "id,name,description,type,endpoint"}
response = await self._get("v0/pipes", params)
return [Pipe.from_dict(pipe) for pipe in response["pipes"]]
async def get_pipe(self, pipe_name: str) -> Dict[str, Any]:
"""Get detailed information about a specific pipe."""
return await self._get(f"v0/pipes/{pipe_name}")
async def get_pipe_data(self, pipe_name: str, **params) -> PipeData:
"""Get data from a pipe with optional parameters."""
response = await self._get(f"v0/pipes/{pipe_name}.json", params)
return PipeData.from_dict(
{key: response[key] for key in ["meta", "data"] if key in response}
)
async def run_select_query(self, query: str, **kwargs: Any) -> Dict[str, Any]:
"""Run a SQL SELECT query."""
kwargs = kwargs or {}
params = {"q": f"{query} FORMAT JSON", **kwargs}
return await self._get("v0/sql", params)
async def llms(self, query: str) -> Dict[str, Any]:
url = "https://www.tinybird.co/docs/llms-full.txt"
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.text
async def explain(self, pipe_name: str) -> Dict[str, Any]:
endpoint = f"v0/pipes/{pipe_name}/explain"
return await self._get(endpoint)
async def save_event(self, datasource_name: str, data: str):
url = f"{self.api_url}/v0/events"
params = {"name": datasource_name, "token": self.token}
try:
response = await self.client.post(url, params=params, data=data)
response.raise_for_status()
return response.text
except Exception as e:
raise ValueError(str(e))
async def push_datafile(self, files: str):
url = f"{self.api_url}/v0/datafiles"
file_path = Path(files)
files_dict = {
file_path.name: (
file_path.name,
file_path.open("rb"),
"application/octet-stream",
)
}
params = {
"filenames": file_path.name,
"force": "True",
"dry_run": "False",
"token": self.token,
}
response = await self.client.post(url, params=params, files=files_dict)
response.raise_for_status()
return response.text