get-schema
Retrieve column names from a Coupler.io data flow schema by specifying the data flow ID and execution ID.
Instructions
Get data table schema from a Coupler.io data flow. Get column names from columnName properties in column definitions. Example: {"columns":[{"key":"Row Updated At.0","label":"Row Updated At","schema":{"type":"string"},"typeOptions":{},"columnName":"col_0"},{"key":"Dimension: Source.0","label":"Dimension: Source","schema":{"type":"string"},"typeOptions":{},"columnName":"col_1"}]}. Here the columns are col_0 and col_1.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dataflowId | Yes | The ID of the data flow with a successful run | |
| executionId | Yes | The ID of the last successful run (execution) of the data flow. |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| schema | Yes |
Implementation Reference
- src/tools/get-schema/handler.ts:18-48 (handler)Main handler function for the get-schema tool. Validates input params, retrieves the schema file via FileManager, assigns column names (col_0, col_1, ...), and returns the schema as structured content.
export const handler = async (params?: Record<string, unknown>): Promise<CallToolResult> => { const validationResult = zodSchema.safeParse(params) if (!validationResult.success) { const error = fromError(validationResult.error) logger.error(`Invalid parameters for get-schema tool: ${error.toString()}`) return textResponse({ text: `Invalid parameters for get-schema tool. ${error.toString()}`, isError: true, }) } const fileManager = new FileManager(validationResult.data) let schemaPath: string try { schemaPath = await fileManager.getFile('schema') } catch (e) { logger.error(`Failed to get dataflow schema file: ${e}`) return textResponse({ text: `Failed to get dataflow ${validationResult.data.dataflowId} schema file. ${e}`, isError: true }) } const schema = JSON.parse(readFileSync(schemaPath, 'utf-8')) schema.columns.forEach((col: ColumnDefinition, index: number) => { col.columnName = `col_${index}` }) return textResponse({ text: JSON.stringify(schema, null, 2), structuredContent: { schema } }) } - src/tools/get-schema/schema.ts:4-31 (schema)Input/output Zod schemas for get-schema tool. Input requires dataflowId and executionId (non-empty, no whitespace). Output defines a schema object with columns array including key, label, columnName, schema, and optional typeOptions.
export const zodSchema = z.object({ dataflowId: z .string() .min(1, 'dataflowId is required') .regex(/^\S+$/, 'dataflowId must not contain whitespace') .describe('The ID of the data flow with a successful run'), executionId: z .string() .min(1, 'executionId is required') .regex(/^\S+$/, 'executionId must be a non-empty string') .describe('The ID of the last successful run (execution) of the data flow.'), }).strict() export const inputSchema = zodToJsonSchema(zodSchema) const zodOutputSchema = z.object({ schema: z.object({ columns: z.array(z.object({ key: z.string(), label: z.string().describe('Human readable column label.'), columnName: z.string().describe('The actual column name in the SQLite database.'), schema: z.record(z.unknown()), typeOptions: z.record(z.unknown()).optional().describe('Additional options for the column type.'), })) }) }) export const outputSchema = zodToJsonSchema(zodOutputSchema) - src/tools/get-schema/index.ts:1-18 (registration)Registration/index file for the get-schema tool. Exports the handler, name, description, and a toolListEntry object that ties together name, description, input/output schemas, and annotations.
import { inputSchema, outputSchema } from './schema.js' export { handler } from './handler.js' export const name = 'get-schema' export const description = 'Get data table schema from a Coupler.io data flow. Get column names from `columnName` properties in column definitions. Example: {"columns":[{"key":"Row Updated At.0","label":"Row Updated At","schema":{"type":"string"},"typeOptions":{},"columnName":"col_0"},{"key":"Dimension: Source.0","label":"Dimension: Source","schema":{"type":"string"},"typeOptions":{},"columnName":"col_1"}]}. Here the columns are `col_0` and `col_1`.' const annotations = { title: 'Get data schema from a Coupler.io data flow.', idempotentHint: true, } export const toolListEntry = { name, description, inputSchema, outputSchema, annotations, } - src/server/index.ts:6-48 (registration)Server registration of get-schema tool. Imported from tools/get-schema/index.ts, added to TOOL_MAP for dispatch, and listed in ListToolsRequestSchema handler for discovery.
import * as getSchema from '../tools/get-schema/index.js' import * as listDataflows from '../tools/list-dataflows/index.js' import * as getDataflow from '../tools/get-dataflow/index.js' const TOOL_MAP = { [getData.name]: getData.handler, [getSchema.name]: getSchema.handler, [listDataflows.name]: listDataflows.handler, [getDataflow.name]: getDataflow.handler, } export const server = new Server({ name: 'Coupler.io MCP server', version: '0.0.5', }, { capabilities: { tools: {}, logging: {} } }) // Look up the tool by name in TOOL_MAP and call its handler server.setRequestHandler(CallToolRequestSchema, async (request) => { const handler = TOOL_MAP[request.params.name as keyof typeof TOOL_MAP] if (!handler) { throw new Error(`Handler for tool "${request.params.name}" not found`) } return await handler(request.params.arguments) }) // List all tools server.setRequestHandler( ListToolsRequestSchema, async () => ({ tools: [ getData.toolListEntry, getSchema.toolListEntry, listDataflows.toolListEntry, getDataflow.toolListEntry, ] }) ) - FileManager helper class used by the get-schema handler to retrieve the schema file (schema.json) either from cache or by downloading via a signed URL from the Coupler.io API.
export class FileManager { readonly dataflowId: string readonly executionId: string readonly coupler: CouplerioClient constructor({ dataflowId, executionId, Client = CouplerioClient }: { dataflowId: string, executionId: string, Client?: typeof CouplerioClient }) { this.dataflowId = dataflowId this.executionId = executionId this.coupler = new Client({ auth: COUPLER_ACCESS_TOKEN }) } initStorage() { mkdirSync(path.join(DOWNLOAD_DIR, this.dataflowId, this.executionId), { recursive: true }) } /** * * @throws {Error} If the file does not exist yet and can't be downloaded */ async getFile(fileType: keyof typeof DataflowFile): Promise<string> { const filePath = this.buildFilePath(fileType) if (existsSync(filePath)) { return filePath } const fileUrl = await this.getFileUrl(fileType) return await this.downloadFile(fileUrl, fileType) } /** * * @throws {Error} If the file can't be downloaded or written */ async downloadFile(url: string, fileType: keyof typeof DataflowFile): Promise<string> { await this.initStorage() const fileResponse = await fetch(url) const filePath = this.buildFilePath(fileType) if (!fileResponse.ok) { throw new Error(`Failed to download file. Response status: ${fileResponse.status}`) } const data = Buffer.from(await fileResponse.arrayBuffer()) writeFileSync(filePath, data) return filePath } buildFilePath(fileType: keyof typeof DataflowFile): string { const fileName = fileType === 'sqlite' ? DataflowFile.sqlite.name : DataflowFile.schema.name return path.join(DOWNLOAD_DIR, this.dataflowId, this.executionId, fileName) } /** * * @throws {Error} If the request fails */ async getFileUrl(fileType: keyof typeof DataflowFile): Promise<string> { const query = new URLSearchParams({ execution_id: this.executionId, }) const response = await this.coupler.request( `/dataflows/{dataflowId}/signed_url?${query}`, { expand: { dataflowId: this.dataflowId }, request: { method: 'POST', body: JSON.stringify({ file: fileType }) }, } ) if (!response.ok) { throw new Error(`Failed to get ${fileType} file signed URL for dataflow ID ${this.dataflowId}. Response status: ${response.status}`) } const { signed_url: signedUrl } = await response.json() as SignedUrlDto return signedUrl } }