schema_reference
Retrieve the database schema for Logfire telemetry data, including table structures and column types, to construct efficient SQL queries.
Instructions
The database schema for the Logfire DataFusion database.
This includes all tables, columns, and their types as well as descriptions.
For example:
```sql
-- The records table contains spans and logs.
CREATE TABLE records (
message TEXT, -- The message of the record
span_name TEXT, -- The name of the span, message is usually templated from this
trace_id TEXT, -- The trace ID, identifies a group of spans in a trace
exception_type TEXT, -- The type of the exception
exception_message TEXT, -- The message of the exception
-- other columns...
);
```
The SQL syntax is similar to Postgres, although the query engine is actually Apache DataFusion.
To access nested JSON fields e.g. in the `attributes` column use the `->` and `->>` operators.
You may need to cast the result of these operators e.g. `(attributes->'cost')::float + 10`.
You should apply as much filtering as reasonable to reduce the amount of data queried.
Filters on `start_timestamp`, `service_name`, `span_name`, `metric_name`, `trace_id` are efficient.Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- logfire_mcp/main.py:77-130 (handler)The schema_reference async function that executes the tool logic. It fetches the database schema from the Logfire API (/v1/schemas) and converts it to SQL CREATE TABLE statements.
async def schema_reference(ctx: Context[ServerSession, MCPState]) -> str: """The database schema for the Logfire DataFusion database. This includes all tables, columns, and their types as well as descriptions. For example: ```sql -- The records table contains spans and logs. CREATE TABLE records ( message TEXT, -- The message of the record span_name TEXT, -- The name of the span, message is usually templated from this trace_id TEXT, -- The trace ID, identifies a group of spans in a trace exception_type TEXT, -- The type of the exception exception_message TEXT, -- The message of the exception -- other columns... ); ``` The SQL syntax is similar to Postgres, although the query engine is actually Apache DataFusion. To access nested JSON fields e.g. in the `attributes` column use the `->` and `->>` operators. You may need to cast the result of these operators e.g. `(attributes->'cost')::float + 10`. You should apply as much filtering as reasonable to reduce the amount of data queried. Filters on `start_timestamp`, `service_name`, `span_name`, `metric_name`, `trace_id` are efficient. """ logfire_client = ctx.request_context.lifespan_context.logfire_client response = await logfire_client.client.get('/v1/schemas') schema_data = response.json() def schema_to_sql(schema_json: dict[str, Any]) -> str: sql_commands: list[str] = [] for table in schema_json.get('tables', []): table_name = table['name'] columns: list[str] = [] for col_name, col_info in table['schema'].items(): data_type = col_info['data_type'] nullable = col_info.get('nullable', True) description = col_info.get('description', '').strip() column_def = f'{col_name} {data_type}' if not nullable: column_def += ' NOT NULL' if description: column_def += f' -- {description}' columns.append(column_def) create_table = f'CREATE TABLE {table_name} (\n ' + ',\n '.join(columns) + '\n);' sql_commands.append(create_table) return '\n\n'.join(sql_commands) return schema_to_sql(schema_data) - logfire_mcp/main.py:162-162 (registration)Registration of the schema_reference tool with the FastMCP server via mcp.tool()(schema_reference).
mcp.tool()(schema_reference) - logfire_mcp/main.py:106-128 (helper)The schema_to_sql nested helper function that transforms schema JSON data into SQL CREATE TABLE statements.
def schema_to_sql(schema_json: dict[str, Any]) -> str: sql_commands: list[str] = [] for table in schema_json.get('tables', []): table_name = table['name'] columns: list[str] = [] for col_name, col_info in table['schema'].items(): data_type = col_info['data_type'] nullable = col_info.get('nullable', True) description = col_info.get('description', '').strip() column_def = f'{col_name} {data_type}' if not nullable: column_def += ' NOT NULL' if description: column_def += f' -- {description}' columns.append(column_def) create_table = f'CREATE TABLE {table_name} (\n ' + ',\n '.join(columns) + '\n);' sql_commands.append(create_table) return '\n\n'.join(sql_commands)