Skip to main content
Glama
parquet.tools.ts25.5 kB
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { z } from 'zod'; import axios from 'axios'; // Import hyparquet with correct function names import { parquetReadObjects, asyncBufferFromUrl } from 'hyparquet'; // Import compressors for BROTLI support import { compressors } from 'hyparquet-compressors'; import { prefixToolName } from './utils/tool-naming.js'; /** * Custom JSON serializer that handles BigInt values by converting them to strings * @param key The key of the current property being serialized * @param value The value of the current property being serialized * @returns The serialized value */ function bigIntSerializer(key: string, value: any): any { // Convert BigInt values to strings if (typeof value === 'bigint') { return value.toString(); } return value; } /** * Process an object to convert all BigInt values to strings * This is needed because JSON.stringify cannot handle BigInt values directly * @param obj The object to process * @returns A new object with all BigInt values converted to strings */ function processBigIntValues(obj: any): any { if (obj === null || obj === undefined) { return obj; } if (typeof obj === 'bigint') { return obj.toString(); } if (Array.isArray(obj)) { return obj.map(item => processBigIntValues(item)); } if (typeof obj === 'object') { const result: Record<string, any> = {}; for (const key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) { result[key] = processBigIntValues(obj[key]); } } return result; } return obj; } /** * Process empty date objects in parquet data * @param data Array of data rows to process * @param url URL of the parquet file * @returns Processed data with proper date strings */ function processEmptyDateObjects(data: any[], url: string): any[] { if (!data || data.length === 0) { return data; } // Extract dataset name from URL const urlParts = url.split('/'); const filename = urlParts[urlParts.length - 1] || ''; const datasetName = filename.split('.')[0] || ''; // Find the date column(s) in the data const firstRow = data[0]; const dateColumns: string[] = []; // Look for columns that might be date columns Object.keys(firstRow).forEach(key => { const lowerKey = key.toLowerCase(); if (lowerKey === 'date' || lowerKey.includes('date') || lowerKey === 'dt' || lowerKey === 'timestamp') { dateColumns.push(key); } }); if (dateColumns.length === 0) { return data; // No date columns found } // Check if we have any rows with valid date strings to use as a pattern let datePattern: string | null = null; for (const dateColumn of dateColumns) { for (const row of data) { const dateValue = row[dateColumn]; if (typeof dateValue === 'string' && dateValue.trim() !== '') { datePattern = dateValue; break; } } if (datePattern) break; } // Dataset-specific handling if (datasetName === 'employment_sector') { // For employment_sector dataset, we know it's annual data from 2001-2022 // with 3 sectors (agriculture, industry, services) and 3 sexes (both, male, female) const uniqueSectors = new Set(); const uniqueSexes = new Set(); data.forEach(row => { if (row.sector) uniqueSectors.add(row.sector); if (row.sex) uniqueSexes.add(row.sex); }); const rowsPerYear = uniqueSectors.size * uniqueSexes.size || 9; const startYear = 2001; // First year in the dataset // Process each row return data.map((row, index) => { const newRow = { ...row }; // Check if there's a date field that's an empty object if (row.date && typeof row.date === 'object' && Object.keys(row.date).length === 0) { // Calculate which year this row belongs to const yearIndex = Math.floor(index / rowsPerYear); const year = startYear + yearIndex; // Replace empty date object with a proper date string newRow.date = `${year}-01-01 00:00:00`; } return newRow; }); } // Generic handling for other datasets return data.map(row => { const newRow = { ...row }; // Process each date column for (const dateColumn of dateColumns) { const dateValue = row[dateColumn]; // Check if there's a date field that's an empty object if (dateValue && typeof dateValue === 'object' && Object.keys(dateValue).length === 0) { // If we have a date pattern, use it as a template if (datePattern) { // Extract the format from the pattern (e.g., YYYY-MM-DD HH:MM:SS) if (datePattern.includes(' ')) { // Full datetime format newRow[dateColumn] = `${new Date().getFullYear()}-01-01 00:00:00`; // Use current year } else { // Date only format newRow[dateColumn] = `${new Date().getFullYear()}-01-01`; // Use current year } } else { // Default format if no pattern found - use current date for latest data const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); const day = String(now.getDate()).padStart(2, '0'); newRow[dateColumn] = `${year}-${month}-${day}`; } } } return newRow; }); } /** * Extract the latest date from parquet data * @param data The parquet data to analyze * @returns The latest date in YYYY-MM format, or undefined if no date found */ function extractLatestDate(data: any[]): string | undefined { if (!data || data.length === 0) { return undefined; } // Check if data has a date column const firstRow = data[0]; let dateColumn: string | null = null; // Find the date column - could be named 'date', 'dt', or contain 'date' in the name for (const key of Object.keys(firstRow)) { const lowerKey = key.toLowerCase(); if (lowerKey === 'date' || lowerKey === 'dt' || lowerKey.includes('date')) { dateColumn = key; break; } } // If no date column found, check if this is callout data with 'latest' values // This is a special case for the epayment_systems_timeseries_callout.parquet file if (!dateColumn && firstRow.hasOwnProperty('latest') && firstRow.hasOwnProperty('chart')) { // This is likely callout data, which represents the latest month // Use the current date as the reference const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); return `${year}-${month}`; } if (!dateColumn) { // If we still don't have a date column, try to infer from the data structure // For time series data, we can assume the latest data is for the current month const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); return `${year}-${month}`; } // Extract all dates and find the latest one const dates: Date[] = []; for (const row of data) { const dateValue = row[dateColumn]; if (dateValue) { // Handle different date formats let date: Date | null = null; if (dateValue instanceof Date) { date = dateValue; } else if (typeof dateValue === 'string') { // Try to parse the date string date = new Date(dateValue); } else if (typeof dateValue === 'number') { // Assume timestamp in milliseconds date = new Date(dateValue); } else if (typeof dateValue === 'object') { // Some date objects might be serialized in a special way // Try to extract year and month if available if (dateValue.year && dateValue.month) { date = new Date(dateValue.year, dateValue.month - 1); } else { // If we can't extract a proper date, use current date date = new Date(); } } if (date && !isNaN(date.getTime())) { dates.push(date); } } } if (dates.length === 0) { // If we couldn't extract any valid dates, use the current date const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); return `${year}-${month}`; } // Find the latest date const latestDate = new Date(Math.max(...dates.map(d => d.getTime()))); // Format as YYYY-MM const year = latestDate.getFullYear(); const month = String(latestDate.getMonth() + 1).padStart(2, '0'); return `${year}-${month}`; } // Define the structure for Parquet metadata interface ParquetMetadata { filename: string; url: string; fileSize?: number; lastModified?: string; contentType?: string; } /** * Helper function to get metadata about a parquet file from a URL * @param url URL of the parquet file * @returns Metadata about the parquet file */ async function getParquetMetadata(url: string): Promise<ParquetMetadata> { try { // First make a HEAD request to get metadata without downloading the file const headResponse = await axios({ method: 'head', url, timeout: 5000, // 5 second timeout }); // Extract the filename from the URL const filename = url.split('/').pop() || 'unknown.parquet'; return { filename, url, fileSize: parseInt(headResponse.headers['content-length'] || '0', 10), lastModified: headResponse.headers['last-modified'], contentType: headResponse.headers['content-type'], }; } catch (error) { console.error('Error getting parquet file metadata:', error); // Return basic metadata even if HEAD request fails return { filename: url.split('/').pop() || 'unknown.parquet', url, }; } } /** * Get information about a parquet file's structure based on common patterns * @param filename Name of the parquet file * @returns Estimated structure information */ function estimateParquetStructure(filename: string): Record<string, any> { // Extract information from filename const nameParts = filename.replace('.parquet', '').split('_'); // Try to guess the structure based on common naming patterns if (filename.includes('epayment') || filename.includes('payment')) { return { estimatedSchema: { 'date': 'datetime', 'payment_channel': 'string', 'transaction_count': 'integer', 'transaction_value': 'float', 'growth_rate': 'float', }, estimatedRowCount: 'Unknown (typically monthly data for 2-5 years)', possibleVisualization: 'Time series chart showing payment channel usage over time', dataType: 'Payment transaction data', }; } if (filename.includes('timeseries')) { return { estimatedSchema: { 'date': 'datetime', 'value': 'float', 'category': 'string', }, estimatedRowCount: 'Unknown (typically daily or monthly time series)', possibleVisualization: 'Line chart showing trends over time', dataType: 'Time series data', }; } // Generic estimation based on filename parts const estimatedColumns: Record<string, string> = {}; nameParts.forEach(part => { if (part.includes('date') || part === 'dt') { estimatedColumns['date'] = 'datetime'; } else if (part.includes('value') || part.includes('amount')) { estimatedColumns[part] = 'float'; } else if (part.includes('count') || part.includes('num')) { estimatedColumns[part] = 'integer'; } else if (part.includes('id')) { estimatedColumns[part] = 'string'; } else if (part.includes('name') || part.includes('type') || part.includes('category')) { estimatedColumns[part] = 'string'; } }); // Add some generic columns if we couldn't extract much if (Object.keys(estimatedColumns).length < 2) { estimatedColumns['value'] = 'unknown'; estimatedColumns['category'] = 'unknown'; } return { estimatedSchema: estimatedColumns, estimatedRowCount: 'Unknown', dataType: 'Unknown structured data', }; } /** * Generate a dashboard URL from a parquet file URL * @param parquetUrl URL of the parquet file * @returns Estimated dashboard URL */ function estimateDashboardUrl(parquetUrl: string): string { // Extract the filename and try to map it to a dashboard const filename = parquetUrl.split('/').pop() || ''; if (filename.includes('epayment') || filename.includes('payment')) { return 'https://data.gov.my/dashboard/electronic-payments'; } if (filename.includes('dosm') || filename.includes('statistics')) { return 'https://open.dosm.gov.my/dashboard'; } // Default to the main dashboard page return 'https://data.gov.my/dashboard'; } /** * Parse a Parquet file from a URL using hyparquet * @param url URL of the Parquet file * @param maxRows Maximum number of rows to return * @param datasetId Optional dataset ID to fetch metadata * @returns Parsed Parquet data */ async function parseParquetFromUrl(url: string, maxRows: number = 500, datasetId?: string): Promise<any> { try { // Create an async buffer from the URL const file = await asyncBufferFromUrl({ url }); // Parse the Parquet file using hyparquet with compressors for BROTLI support const rowEnd = maxRows > 0 ? maxRows : undefined; const parquetData = await parquetReadObjects({ file, rowStart: 0, rowEnd, compressors // Add compressors to support BROTLI compression }); // Process the data to handle BigInt values let processedData = processBigIntValues(parquetData); // Process empty date objects processedData = processEmptyDateObjects(processedData, url); // Get metadata to extract schema information // This is a workaround since we don't have direct schema access // We'll infer schema from the first row const schema: Record<string, string> = {}; if (processedData.length > 0) { const firstRow = processedData[0]; Object.keys(firstRow).forEach(key => { const value = firstRow[key]; // Store the original type if it was a BigInt const originalValue = parquetData[0][key]; // Special handling for date fields if (key.toLowerCase() === 'date') { schema[key] = 'string'; } else { schema[key] = typeof originalValue === 'bigint' ? 'bigint' : typeof value; } }); } // Removed sorting and filtering code // Get the total number of rows - approximation since we don't have direct access const totalRows = processedData.length; // Extract the latest date from the data const latestDate = extractLatestDate(processedData); return { schema, totalRows, displayedRows: processedData.length, data: processedData, detected_date: latestDate // Include the detected date in the response }; } catch (error) { console.error('Error parsing Parquet file:', error); throw error; } } export function registerParquetTools(server: McpServer) { // Parse a Parquet file from a URL server.tool( prefixToolName('parse_parquet_file'), 'Parse and display data from a Parquet file URL', { url: z.string().url().describe('URL of the Parquet file to parse'), maxRows: z.number().min(1).max(2000).optional().describe('Maximum number of rows to return (1-2000)'), }, async ({ url, maxRows = 500 }) => { try { // Extract the filename from the URL const filename = url.split('/').pop() || 'unknown.parquet'; // Removed dataset metadata handling code // Parse the Parquet file const parquetData = await parseParquetFromUrl(url, maxRows); // Format the data for display const currentDate = new Date(); const currentYear = currentDate.getFullYear(); const currentMonth = currentDate.getMonth() + 1; // Get the detected date or use current date const detectedDate = parquetData.detected_date || `${currentYear}-${String(currentMonth).padStart(2, '0')}`; // Parse the detected date const [year, month] = detectedDate.split('-').map((part: string) => parseInt(part, 10)); // Format the date for display const monthNames = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']; const formattedMonth = monthNames[month - 1] || 'Unknown'; const formattedYear = year || currentYear; const formattedData = { filename, url, schema: parquetData.schema, totalRows: parquetData.totalRows, displayedRows: parquetData.displayedRows, data: parquetData.data, data_as_of: detectedDate, data_as_of_formatted: `${formattedMonth} ${formattedYear}`, // Add formatted date timestamp: new Date().toISOString() }; return { content: [ { type: 'text', text: JSON.stringify(formattedData, bigIntSerializer, 2), }, ], }; } catch (error) { // If parsing fails, fall back to metadata and estimation try { // Get metadata about the Parquet file const metadata = await getParquetMetadata(url); // Estimate the structure based on the filename const structureInfo = estimateParquetStructure(metadata.filename); // Estimate the dashboard URL const dashboardUrl = estimateDashboardUrl(url); // Format the information for display const formattedInfo = { filename: metadata.filename, url: metadata.url, fileSize: metadata.fileSize ? `${Math.round(metadata.fileSize / 1024 / 1024 * 100) / 100} MB` : 'Unknown', lastModified: metadata.lastModified || 'Unknown', contentType: metadata.contentType || 'application/octet-stream', estimatedStructure: structureInfo, viewableAt: dashboardUrl, error: 'Failed to parse Parquet file', errorMessage: error instanceof Error ? error.message : String(error), note: 'Falling back to estimated structure. You can view the data at the dashboard URL.', timestamp: new Date().toISOString() }; return { content: [ { type: 'text', text: JSON.stringify(formattedInfo, null, 2), }, ], }; } catch (fallbackError) { return { content: [ { type: 'text', text: JSON.stringify({ error: 'Failed to process Parquet file', message: error instanceof Error ? error.message : String(error), fallbackError: fallbackError instanceof Error ? fallbackError.message : String(fallbackError), url, timestamp: new Date().toISOString(), note: 'Parquet files can be viewed through their corresponding dashboards on data.gov.my', }, null, 2), }, ], }; } } } ); // Get information about a Parquet file from a URL server.tool( prefixToolName('get_parquet_info'), 'Get metadata and structure information about a Parquet file', { url: z.string().url().describe('URL of the Parquet file to analyze'), }, async ({ url }) => { try { // Try to parse the Parquet file to get accurate schema information const parquetData = await parseParquetFromUrl(url, 0); // Get metadata about the Parquet file const metadata = await getParquetMetadata(url); // Estimate the dashboard URL const dashboardUrl = estimateDashboardUrl(url); // Format the information for display const currentDate = new Date(); const currentYear = currentDate.getFullYear(); const currentMonth = currentDate.getMonth() + 1; // Get the detected date or use current date const detectedDate = parquetData.detected_date || `${currentYear}-${String(currentMonth).padStart(2, '0')}`; // Parse the detected date const [year, month] = detectedDate.split('-').map((part: string) => parseInt(part, 10)); // Format the date for display const monthNames = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']; const formattedMonth = monthNames[month - 1] || 'Unknown'; const formattedYear = year || currentYear; const formattedInfo = { filename: metadata.filename, url: metadata.url, fileSize: metadata.fileSize ? `${Math.round(metadata.fileSize / 1024 / 1024 * 100) / 100} MB` : 'Unknown', lastModified: metadata.lastModified || 'Unknown', contentType: metadata.contentType || 'application/octet-stream', schema: parquetData.schema, totalRows: parquetData.totalRows, data_as_of: detectedDate, data_as_of_formatted: `${formattedMonth} ${formattedYear}`, // Add formatted date viewableAt: dashboardUrl, timestamp: new Date().toISOString() }; return { content: [ { type: 'text', text: JSON.stringify(formattedInfo, bigIntSerializer, 2), }, ], }; } catch (error) { // Fall back to estimation if parsing fails try { // Get metadata about the Parquet file const metadata = await getParquetMetadata(url); // Estimate the structure based on the filename const structureInfo = estimateParquetStructure(metadata.filename); // Estimate the dashboard URL const dashboardUrl = estimateDashboardUrl(url); // Format the information for display const formattedInfo = { filename: metadata.filename, url: metadata.url, fileSize: metadata.fileSize ? `${Math.round(metadata.fileSize / 1024 / 1024 * 100) / 100} MB` : 'Unknown', lastModified: metadata.lastModified || 'Unknown', contentType: metadata.contentType || 'application/octet-stream', estimatedStructure: structureInfo, viewableAt: dashboardUrl, note: 'Could not parse the Parquet file directly. This is an estimation based on the filename and common patterns.', timestamp: new Date().toISOString() }; return { content: [ { type: 'text', text: JSON.stringify(formattedInfo, null, 2), }, ], }; } catch (fallbackError) { return { content: [ { type: 'text', text: JSON.stringify({ error: 'Failed to process Parquet file', message: error instanceof Error ? error.message : String(error), fallbackError: fallbackError instanceof Error ? fallbackError.message : String(fallbackError), url, timestamp: new Date().toISOString(), note: 'Parquet files can be viewed through their corresponding dashboards on data.gov.my', }, null, 2), }, ], }; } } } ); // Link to dashboard for a parquet file server.tool( prefixToolName('find_dashboard_for_parquet'), 'Find the corresponding dashboard for a Parquet file', { url: z.string().url().describe('URL of the parquet file'), }, async ({ url }) => { try { // Extract the filename from the URL const filename = url.split('/').pop() || 'unknown.parquet'; // Estimate the dashboard URL const dashboardUrl = estimateDashboardUrl(url); return { content: [ { type: 'text', text: JSON.stringify({ filename, parquetUrl: url, dashboardUrl, note: 'This is the estimated dashboard where you can view the visualized data from this Parquet file.', timestamp: new Date().toISOString() }, bigIntSerializer, 2), }, ], }; } catch (error) { return { content: [ { type: 'text', text: JSON.stringify({ error: 'Failed to find dashboard', message: error instanceof Error ? error.message : String(error), url, fallbackUrl: 'https://data.gov.my/dashboard', timestamp: new Date().toISOString() }, bigIntSerializer, 2), }, ], }; } } ); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hithereiamaliff/mcp-datagovmy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server