mcp_powerdrill_create_data_source_from_local_file
Upload local files to Powerdrill datasets for AI analysis. Specify dataset ID, file path, optional custom name, and chunk size to create data sources.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dataset_id | Yes | The ID of the dataset to create the data source in | |
| file_path | Yes | The local path to the file to upload | |
| file_name | No | Optional custom name for the file, defaults to the original filename | |
| chunk_size | No | Size of each chunk in bytes, default is 5MB |
Implementation Reference
- src/index.ts:603-612 (registration)Registration of the MCP tool 'mcp_powerdrill_create_data_source_from_local_file' with Zod input schema and inline handler function.// Register the createDataSourceFromLocalFile tool server.tool( 'mcp_powerdrill_create_data_source_from_local_file', { dataset_id: z.string().describe('The ID of the dataset to create the data source in'), file_path: z.string().describe('The local path to the file to upload'), file_name: z.string().optional().describe('Optional custom name for the file, defaults to the original filename'), chunk_size: z.number().optional().default(5 * 1024 * 1024).describe('Size of each chunk in bytes, default is 5MB') }, async (args, extra) => {
- src/index.ts:606-611 (schema)Zod schema defining the input parameters for the tool: dataset_id, file_path, optional file_name and chunk_size.{ dataset_id: z.string().describe('The ID of the dataset to create the data source in'), file_path: z.string().describe('The local path to the file to upload'), file_name: z.string().optional().describe('Optional custom name for the file, defaults to the original filename'), chunk_size: z.number().optional().default(5 * 1024 * 1024).describe('Size of each chunk in bytes, default is 5MB') },
- src/index.ts:612-799 (handler)The primary handler function that orchestrates uploading a local file in chunks via multipart upload to Powerdrill, creates a data source in the specified dataset, and polls until it's synched.async (args, extra) => { try { const { dataset_id, file_path, file_name, chunk_size } = args; // Import required modules const fs = await import('fs'); const path = await import('path'); const axios = await import('axios'); // Validate file existence if (!fs.existsSync(file_path)) { throw new Error(`File not found: ${file_path}`); } // Get file stats const stats = fs.statSync(file_path); const fileSize = stats.size; // Determine file name if not provided const actualFileName = file_name || path.basename(file_path); // Initialize Powerdrill client const client = new (await import('./utils/powerdrillClient.js')).PowerdrillClient(); // Helper function to read a file in chunks const readFileChunk = (filePath: string, start: number, end: number): Promise<Buffer> => { return new Promise((resolve, reject) => { const readStream = fs.createReadStream(filePath, { start, end }); const chunks: Buffer[] = []; readStream.on('data', (chunk) => { if (Buffer.isBuffer(chunk)) { chunks.push(chunk); } else { chunks.push(Buffer.from(chunk)); } }); readStream.on('error', (err) => reject(err)); readStream.on('end', () => resolve(Buffer.concat(chunks))); }); }; // Helper function to upload a file chunk and get its ETag const uploadFileChunk = async (url: string, chunk: Buffer): Promise<string> => { try { const response = await axios.default.put(url, chunk, { headers: { 'Content-Type': 'application/octet-stream' } }); // Extract ETag from response headers, remove quotes if present const etag = response.headers.etag || ''; return etag.replace(/"/g, ''); } catch (error: any) { console.error('Error uploading file chunk:', error.message); throw error; } }; // Helper function to poll a data source until it's synched or fails const pollDataSourceStatus = async (datasetId: string, dataSourceId: string, maxAttempts: number = 20, delayMs: number = 3000) => { let attempts = 0; while (attempts < maxAttempts) { const response = await client.getDataSource(datasetId, dataSourceId); if (response.code !== 0) { throw new Error(`Error getting data source status: ${JSON.stringify(response)}`); } if (response.data.status === 'synched') { return response; } if (response.data.status === 'invalid') { throw new Error(`Data source processing failed with status: invalid`); } // Wait before the next attempt await new Promise(resolve => setTimeout(resolve, delayMs)); attempts++; } throw new Error(`Timed out waiting for data source to be synched after ${maxAttempts} attempts`); }; // Step 1: Initiate multipart upload const initUploadResponse = await client.initiateMultipartUpload({ file_name: actualFileName, file_size: fileSize }); if (initUploadResponse.code !== 0 || !initUploadResponse.data) { throw new Error(`Failed to initiate multipart upload: ${JSON.stringify(initUploadResponse)}`); } const { upload_id, file_object_key, part_items } = initUploadResponse.data; // Step 2: Upload each file part const partEtags = []; for (const part of part_items) { const startByte = (part.number - 1) * chunk_size; const endByte = Math.min(startByte + part.size - 1, fileSize - 1); // Read file chunk const chunk = await readFileChunk(file_path, startByte, endByte); // Upload chunk and get ETag const etag = await uploadFileChunk(part.upload_url, chunk); partEtags.push({ number: part.number, etag: etag }); } // Step 3: Complete multipart upload const completeUploadResponse = await client.completeMultipartUpload({ file_object_key, upload_id, part_etags: partEtags }); if (completeUploadResponse.code !== 0 || !completeUploadResponse.data) { throw new Error(`Failed to complete multipart upload: ${JSON.stringify(completeUploadResponse)}`); } // Step 4: Create data source const createDataSourceResponse = await client.createDataSource(dataset_id, { name: actualFileName, type: 'FILE', file_object_key: completeUploadResponse.data.file_object_key }); if (createDataSourceResponse.code !== 0 || !createDataSourceResponse.data) { throw new Error(`Failed to create data source: ${JSON.stringify(createDataSourceResponse)}`); } const dataSourceId = createDataSourceResponse.data.id; // Step 5: Poll until data source is synched const finalStatus = await pollDataSourceStatus(dataset_id, dataSourceId); // Format the response as MCP content return { content: [ { type: "text", text: JSON.stringify({ dataset_id, data_source: { id: dataSourceId, name: finalStatus.data.name, type: finalStatus.data.type, status: finalStatus.data.status, size: finalStatus.data.size }, file: { name: actualFileName, size: fileSize, object_key: file_object_key } }, null, 2) } ] }; } catch (error: any) { console.error(`Error creating data source from local file: ${error.message}`); console.error(error.stack); // Return error response return { content: [ { type: "text", text: JSON.stringify({ error: `Error creating data source from local file: ${error.message}`, errorType: error.name || 'UnknownError', errorStack: process.env.NODE_ENV === 'development' ? error.stack : undefined }, null, 2) } ], isError: true }; } }
- src/index.ts:673-697 (helper)Helper function to poll the status of the created data source until it reaches 'synched' status or fails.const pollDataSourceStatus = async (datasetId: string, dataSourceId: string, maxAttempts: number = 20, delayMs: number = 3000) => { let attempts = 0; while (attempts < maxAttempts) { const response = await client.getDataSource(datasetId, dataSourceId); if (response.code !== 0) { throw new Error(`Error getting data source status: ${JSON.stringify(response)}`); } if (response.data.status === 'synched') { return response; } if (response.data.status === 'invalid') { throw new Error(`Data source processing failed with status: invalid`); } // Wait before the next attempt await new Promise(resolve => setTimeout(resolve, delayMs)); attempts++; } throw new Error(`Timed out waiting for data source to be synched after ${maxAttempts} attempts`); };
- PowerdrillClient method to create a data source in a dataset using the uploaded file object key.async createDataSource(datasetId: string, options: { name: string; type: string; file_object_key: string; }) { try { const requestBody = { ...options, user_id: this.config.userId }; const response = await this.client.post(`/datasets/${datasetId}/datasources`, requestBody); return response.data; } catch (error: any) { console.error('Error creating data source:', error.message); throw error; } }