get-data
Run SQL queries on a Coupler.io data flow execution to retrieve specific data.
Instructions
Get data from a Coupler.io data flow run. Make sure to first query a sample of 5 rows from data table, e.g. SELECT * from data LIMIT 5, and then run the get-schema tool, to better understand the structure. The get-schema tool will return the JSON-encoded schema of the data table. When visualizing the data, do not try to read any files or fetch any URLs, just generate a static page and use the data you get from the tools.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dataflowId | Yes | The ID of the data flow with a successful run | |
| executionId | Yes | The ID of the last successful run (execution) of the data flow. | |
| query | Yes | The SQL query to run on the data flow sqlite file. |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| data | Yes | The data returned from the query. |
Implementation Reference
- src/tools/get-data/handler.ts:1-48 (handler)The main handler function for the 'get-data' tool. It validates input params with Zod schema, downloads/gets the sqlite file via FileManager, executes the user-provided SQL query on it using node:sqlite, and returns the result as JSON.
import { DatabaseSync as Database } from 'node:sqlite' import type { CallToolResult } from '@modelcontextprotocol/sdk/types.js' import { fromError } from 'zod-validation-error' import { textResponse } from '../../util/tool-response.js' import { FileManager } from '../shared/file-manager.js' import { logger } from '../../logger/index.js' import { zodInputSchema } from './schema.js' export const handler = async (params?: Record<string, unknown>): Promise<CallToolResult> => { const validationResult = zodInputSchema.safeParse(params) if (!validationResult.success) { const error = fromError(validationResult.error) logger.error(`Invalid parameters for get-data tool: ${error.toString()}`) return textResponse({ text: `Invalid parameters for get-data tool. ${error.toString()}`, isError: true, }) } const fileManager = new FileManager(validationResult.data) let sqlitePath: string try { sqlitePath = await fileManager.getFile('sqlite') } catch (e) { return textResponse({ text: `Failed to get data flow ${validationResult.data.dataflowId} sqlite file. ${e}`, isError: true }) } const db = new Database(sqlitePath, { readOnly: true }) let statement, queryResult try { statement = db.prepare(validationResult.data.query) queryResult = statement.all() } catch (e) { return textResponse({ text: `Failed to execute query: ${e}`, isError: true }) } finally { db.close() } return textResponse({ text: JSON.stringify(queryResult, null, 2), structuredContent: { data: queryResult } }) } - src/tools/get-data/schema.ts:1-27 (schema)Defines the Zod input schema (dataflowId, executionId, query) and output schema for the get-data tool. Input enforces query must start with SELECT. Output expects an array of records.
import { z } from 'zod' import { zodToJsonSchema } from 'zod-to-json-schema' export const zodInputSchema = z.object({ dataflowId: z.string() .min(1, 'dataflowId is required') .regex(/^\S+$/, 'dataflowId must not contain whitespace') .describe('The ID of the data flow with a successful run'), executionId: z.string() .min(1, 'executionId is required') .regex(/^\S+$/, 'executionId must be a non-empty string') .describe('The ID of the last successful run (execution) of the data flow.'), query: z.string() .min(1, 'query is required') .regex(/^SELECT.*?/, 'must start with "SELECT"') .describe('The SQL query to run on the data flow sqlite file.'), }).strict() export const inputSchema = zodToJsonSchema(zodInputSchema) const zodOutputSchema = z.object({ data: z.array( z.record(z.unknown()) ).describe('The data returned from the query.'), }) export const outputSchema = zodToJsonSchema(zodOutputSchema) - src/tools/get-data/index.ts:1-19 (registration)Exports the tool name ('get-data'), description, input/output schemas, and re-exports the handler. This is the tool registration entry point.
import { inputSchema, outputSchema } from './schema.js' export { handler } from './handler.js' export const name = 'get-data' export const description = 'Get data from a Coupler.io data flow run. Make sure to first query a sample of 5 rows from `data` table, e.g. `SELECT * from data LIMIT 5`, and then run the `get-schema` tool, to better understand the structure. The `get-schema` tool will return the JSON-encoded schema of the `data` table. When visualizing the data, do not try to read any files or fetch any URLs, just generate a static page and use the data you get from the tools.' const annotations = { title: 'Get and query data from a Coupler.io data flow.', idempotentHint: true, } export const toolListEntry = { name, description, inputSchema, outputSchema, annotations, } - src/server/index.ts:5-5 (registration)Imports the get-data tool module into the server. Also line 11 maps getData.name ('get-data') to getData.handler in TOOL_MAP, and line 42 adds getData.toolListEntry to the tool listing.
import * as getData from '../tools/get-data/index.js' - FileManager helper class used by the handler to download and cache the sqlite file for a dataflow execution before querying it.
export class FileManager { readonly dataflowId: string readonly executionId: string readonly coupler: CouplerioClient constructor({ dataflowId, executionId, Client = CouplerioClient }: { dataflowId: string, executionId: string, Client?: typeof CouplerioClient }) { this.dataflowId = dataflowId this.executionId = executionId this.coupler = new Client({ auth: COUPLER_ACCESS_TOKEN }) } initStorage() { mkdirSync(path.join(DOWNLOAD_DIR, this.dataflowId, this.executionId), { recursive: true }) } /** * * @throws {Error} If the file does not exist yet and can't be downloaded */ async getFile(fileType: keyof typeof DataflowFile): Promise<string> { const filePath = this.buildFilePath(fileType) if (existsSync(filePath)) { return filePath } const fileUrl = await this.getFileUrl(fileType) return await this.downloadFile(fileUrl, fileType) } /** * * @throws {Error} If the file can't be downloaded or written */ async downloadFile(url: string, fileType: keyof typeof DataflowFile): Promise<string> { await this.initStorage() const fileResponse = await fetch(url) const filePath = this.buildFilePath(fileType) if (!fileResponse.ok) { throw new Error(`Failed to download file. Response status: ${fileResponse.status}`) } const data = Buffer.from(await fileResponse.arrayBuffer()) writeFileSync(filePath, data) return filePath } buildFilePath(fileType: keyof typeof DataflowFile): string { const fileName = fileType === 'sqlite' ? DataflowFile.sqlite.name : DataflowFile.schema.name return path.join(DOWNLOAD_DIR, this.dataflowId, this.executionId, fileName) } /** * * @throws {Error} If the request fails */ async getFileUrl(fileType: keyof typeof DataflowFile): Promise<string> { const query = new URLSearchParams({ execution_id: this.executionId, }) const response = await this.coupler.request( `/dataflows/{dataflowId}/signed_url?${query}`, { expand: { dataflowId: this.dataflowId }, request: { method: 'POST', body: JSON.stringify({ file: fileType }) }, } ) if (!response.ok) { throw new Error(`Failed to get ${fileType} file signed URL for dataflow ID ${this.dataflowId}. Response status: ${response.status}`) } const { signed_url: signedUrl } = await response.json() as SignedUrlDto return signedUrl } }