Skip to main content
Glama
powerdrillai

Powerdrill MCP Server

Official
by powerdrillai

mcp_powerdrill_create_data_source_from_local_file

Upload local files to Powerdrill datasets for AI analysis. Specify dataset ID, file path, optional custom name, and chunk size to create data sources.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dataset_idYesThe ID of the dataset to create the data source in
file_pathYesThe local path to the file to upload
file_nameNoOptional custom name for the file, defaults to the original filename
chunk_sizeNoSize of each chunk in bytes, default is 5MB

Implementation Reference

  • src/index.ts:603-612 (registration)
    Registration of the MCP tool 'mcp_powerdrill_create_data_source_from_local_file' with Zod input schema and inline handler function.
    // Register the createDataSourceFromLocalFile tool
    server.tool(
      'mcp_powerdrill_create_data_source_from_local_file',
      {
        dataset_id: z.string().describe('The ID of the dataset to create the data source in'),
        file_path: z.string().describe('The local path to the file to upload'),
        file_name: z.string().optional().describe('Optional custom name for the file, defaults to the original filename'),
        chunk_size: z.number().optional().default(5 * 1024 * 1024).describe('Size of each chunk in bytes, default is 5MB')
      },
      async (args, extra) => {
  • Zod schema defining the input parameters for the tool: dataset_id, file_path, optional file_name and chunk_size.
    {
      dataset_id: z.string().describe('The ID of the dataset to create the data source in'),
      file_path: z.string().describe('The local path to the file to upload'),
      file_name: z.string().optional().describe('Optional custom name for the file, defaults to the original filename'),
      chunk_size: z.number().optional().default(5 * 1024 * 1024).describe('Size of each chunk in bytes, default is 5MB')
    },
  • The primary handler function that orchestrates uploading a local file in chunks via multipart upload to Powerdrill, creates a data source in the specified dataset, and polls until it's synched.
    async (args, extra) => {
      try {
        const { dataset_id, file_path, file_name, chunk_size } = args;
    
        // Import required modules
        const fs = await import('fs');
        const path = await import('path');
        const axios = await import('axios');
    
        // Validate file existence
        if (!fs.existsSync(file_path)) {
          throw new Error(`File not found: ${file_path}`);
        }
    
        // Get file stats
        const stats = fs.statSync(file_path);
        const fileSize = stats.size;
    
        // Determine file name if not provided
        const actualFileName = file_name || path.basename(file_path);
    
        // Initialize Powerdrill client
        const client = new (await import('./utils/powerdrillClient.js')).PowerdrillClient();
    
        // Helper function to read a file in chunks
        const readFileChunk = (filePath: string, start: number, end: number): Promise<Buffer> => {
          return new Promise((resolve, reject) => {
            const readStream = fs.createReadStream(filePath, { start, end });
            const chunks: Buffer[] = [];
    
            readStream.on('data', (chunk) => {
              if (Buffer.isBuffer(chunk)) {
                chunks.push(chunk);
              } else {
                chunks.push(Buffer.from(chunk));
              }
            });
            readStream.on('error', (err) => reject(err));
            readStream.on('end', () => resolve(Buffer.concat(chunks)));
          });
        };
    
        // Helper function to upload a file chunk and get its ETag
        const uploadFileChunk = async (url: string, chunk: Buffer): Promise<string> => {
          try {
            const response = await axios.default.put(url, chunk, {
              headers: {
                'Content-Type': 'application/octet-stream'
              }
            });
    
            // Extract ETag from response headers, remove quotes if present
            const etag = response.headers.etag || '';
            return etag.replace(/"/g, '');
          } catch (error: any) {
            console.error('Error uploading file chunk:', error.message);
            throw error;
          }
        };
    
        // Helper function to poll a data source until it's synched or fails
        const pollDataSourceStatus = async (datasetId: string, dataSourceId: string, maxAttempts: number = 20, delayMs: number = 3000) => {
          let attempts = 0;
    
          while (attempts < maxAttempts) {
            const response = await client.getDataSource(datasetId, dataSourceId);
    
            if (response.code !== 0) {
              throw new Error(`Error getting data source status: ${JSON.stringify(response)}`);
            }
    
            if (response.data.status === 'synched') {
              return response;
            }
    
            if (response.data.status === 'invalid') {
              throw new Error(`Data source processing failed with status: invalid`);
            }
    
            // Wait before the next attempt
            await new Promise(resolve => setTimeout(resolve, delayMs));
            attempts++;
          }
    
          throw new Error(`Timed out waiting for data source to be synched after ${maxAttempts} attempts`);
        };
    
        // Step 1: Initiate multipart upload
        const initUploadResponse = await client.initiateMultipartUpload({
          file_name: actualFileName,
          file_size: fileSize
        });
    
        if (initUploadResponse.code !== 0 || !initUploadResponse.data) {
          throw new Error(`Failed to initiate multipart upload: ${JSON.stringify(initUploadResponse)}`);
        }
    
        const { upload_id, file_object_key, part_items } = initUploadResponse.data;
    
        // Step 2: Upload each file part
        const partEtags = [];
    
        for (const part of part_items) {
          const startByte = (part.number - 1) * chunk_size;
          const endByte = Math.min(startByte + part.size - 1, fileSize - 1);
    
          // Read file chunk
          const chunk = await readFileChunk(file_path, startByte, endByte);
    
          // Upload chunk and get ETag
          const etag = await uploadFileChunk(part.upload_url, chunk);
    
          partEtags.push({
            number: part.number,
            etag: etag
          });
        }
    
        // Step 3: Complete multipart upload
        const completeUploadResponse = await client.completeMultipartUpload({
          file_object_key,
          upload_id,
          part_etags: partEtags
        });
    
        if (completeUploadResponse.code !== 0 || !completeUploadResponse.data) {
          throw new Error(`Failed to complete multipart upload: ${JSON.stringify(completeUploadResponse)}`);
        }
    
        // Step 4: Create data source
        const createDataSourceResponse = await client.createDataSource(dataset_id, {
          name: actualFileName,
          type: 'FILE',
          file_object_key: completeUploadResponse.data.file_object_key
        });
    
        if (createDataSourceResponse.code !== 0 || !createDataSourceResponse.data) {
          throw new Error(`Failed to create data source: ${JSON.stringify(createDataSourceResponse)}`);
        }
    
        const dataSourceId = createDataSourceResponse.data.id;
    
        // Step 5: Poll until data source is synched
        const finalStatus = await pollDataSourceStatus(dataset_id, dataSourceId);
    
        // Format the response as MCP content
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify({
                dataset_id,
                data_source: {
                  id: dataSourceId,
                  name: finalStatus.data.name,
                  type: finalStatus.data.type,
                  status: finalStatus.data.status,
                  size: finalStatus.data.size
                },
                file: {
                  name: actualFileName,
                  size: fileSize,
                  object_key: file_object_key
                }
              }, null, 2)
            }
          ]
        };
      } catch (error: any) {
        console.error(`Error creating data source from local file: ${error.message}`);
        console.error(error.stack);
    
        // Return error response
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify({
                error: `Error creating data source from local file: ${error.message}`,
                errorType: error.name || 'UnknownError',
                errorStack: process.env.NODE_ENV === 'development' ? error.stack : undefined
              }, null, 2)
            }
          ],
          isError: true
        };
      }
    }
  • Helper function to poll the status of the created data source until it reaches 'synched' status or fails.
    const pollDataSourceStatus = async (datasetId: string, dataSourceId: string, maxAttempts: number = 20, delayMs: number = 3000) => {
      let attempts = 0;
    
      while (attempts < maxAttempts) {
        const response = await client.getDataSource(datasetId, dataSourceId);
    
        if (response.code !== 0) {
          throw new Error(`Error getting data source status: ${JSON.stringify(response)}`);
        }
    
        if (response.data.status === 'synched') {
          return response;
        }
    
        if (response.data.status === 'invalid') {
          throw new Error(`Data source processing failed with status: invalid`);
        }
    
        // Wait before the next attempt
        await new Promise(resolve => setTimeout(resolve, delayMs));
        attempts++;
      }
    
      throw new Error(`Timed out waiting for data source to be synched after ${maxAttempts} attempts`);
    };
  • PowerdrillClient method to create a data source in a dataset using the uploaded file object key.
    async createDataSource(datasetId: string, options: {
      name: string;
      type: string;
      file_object_key: string;
    }) {
      try {
        const requestBody = {
          ...options,
          user_id: this.config.userId
        };
    
        const response = await this.client.post(`/datasets/${datasetId}/datasources`, requestBody);
        return response.data;
      } catch (error: any) {
        console.error('Error creating data source:', error.message);
        throw error;
      }
    }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/powerdrillai/powerdrill-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server