Skip to main content
Glama

Multi Database MCP Server

hypertable.go11.3 kB
package timescale import ( "context" "fmt" "strings" ) // HypertableConfig defines configuration for creating a hypertable type HypertableConfig struct { TableName string TimeColumn string ChunkTimeInterval string PartitioningColumn string CreateIfNotExists bool SpacePartitions int // Number of space partitions (for multi-dimensional partitioning) IfNotExists bool // If true, don't error if table is already a hypertable MigrateData bool // If true, migrate existing data to chunks } // Hypertable represents a TimescaleDB hypertable type Hypertable struct { TableName string SchemaName string TimeColumn string SpaceColumn string NumDimensions int CompressionEnabled bool RetentionEnabled bool } // CreateHypertable converts a regular PostgreSQL table to a TimescaleDB hypertable func (t *DB) CreateHypertable(ctx context.Context, config HypertableConfig) error { if !t.isTimescaleDB { return fmt.Errorf("TimescaleDB extension not available") } // Construct the create_hypertable call var queryBuilder strings.Builder queryBuilder.WriteString("SELECT create_hypertable(") // Table name and time column are required queryBuilder.WriteString(fmt.Sprintf("'%s', '%s'", config.TableName, config.TimeColumn)) // Optional parameters if config.PartitioningColumn != "" { queryBuilder.WriteString(fmt.Sprintf(", partition_column => '%s'", config.PartitioningColumn)) } if config.ChunkTimeInterval != "" { queryBuilder.WriteString(fmt.Sprintf(", chunk_time_interval => INTERVAL '%s'", config.ChunkTimeInterval)) } if config.SpacePartitions > 0 { queryBuilder.WriteString(fmt.Sprintf(", number_partitions => %d", config.SpacePartitions)) } if config.IfNotExists { queryBuilder.WriteString(", if_not_exists => TRUE") } if config.MigrateData { queryBuilder.WriteString(", migrate_data => TRUE") } queryBuilder.WriteString(")") // Execute the query _, err := t.ExecuteSQLWithoutParams(ctx, queryBuilder.String()) if err != nil { return fmt.Errorf("failed to create hypertable: %w", err) } return nil } // AddDimension adds a new dimension (partitioning key) to a hypertable func (t *DB) AddDimension(ctx context.Context, tableName, columnName string, numPartitions int) error { if !t.isTimescaleDB { return fmt.Errorf("TimescaleDB extension not available") } query := fmt.Sprintf("SELECT add_dimension('%s', '%s', number_partitions => %d)", tableName, columnName, numPartitions) _, err := t.ExecuteSQLWithoutParams(ctx, query) if err != nil { return fmt.Errorf("failed to add dimension: %w", err) } return nil } // ListHypertables returns a list of all hypertables in the database func (t *DB) ListHypertables(ctx context.Context) ([]Hypertable, error) { if !t.isTimescaleDB { return nil, fmt.Errorf("TimescaleDB extension not available") } query := ` SELECT h.table_name, h.schema_name, d.column_name as time_column, count(d.id) as num_dimensions, ( SELECT column_name FROM _timescaledb_catalog.dimension WHERE hypertable_id = h.id AND column_type != 'TIMESTAMP' AND column_type != 'TIMESTAMPTZ' LIMIT 1 ) as space_column FROM _timescaledb_catalog.hypertable h JOIN _timescaledb_catalog.dimension d ON h.id = d.hypertable_id GROUP BY h.id, h.table_name, h.schema_name ` result, err := t.ExecuteSQLWithoutParams(ctx, query) if err != nil { return nil, fmt.Errorf("failed to list hypertables: %w", err) } rows, ok := result.([]map[string]interface{}) if !ok { return nil, fmt.Errorf("unexpected result type from database query") } var hypertables []Hypertable for _, row := range rows { ht := Hypertable{ TableName: fmt.Sprintf("%v", row["table_name"]), SchemaName: fmt.Sprintf("%v", row["schema_name"]), TimeColumn: fmt.Sprintf("%v", row["time_column"]), } // Handle nullable columns if row["space_column"] != nil { ht.SpaceColumn = fmt.Sprintf("%v", row["space_column"]) } // Convert numeric dimensions if dimensions, ok := row["num_dimensions"].(int64); ok { ht.NumDimensions = int(dimensions) } else if dimensions, ok := row["num_dimensions"].(int); ok { ht.NumDimensions = dimensions } // Check if compression is enabled compQuery := fmt.Sprintf( "SELECT count(*) > 0 as is_compressed FROM timescaledb_information.compression_settings WHERE hypertable_name = '%s'", ht.TableName, ) compResult, err := t.ExecuteSQLWithoutParams(ctx, compQuery) if err == nil { if compRows, ok := compResult.([]map[string]interface{}); ok && len(compRows) > 0 { if isCompressed, ok := compRows[0]["is_compressed"].(bool); ok { ht.CompressionEnabled = isCompressed } } } // Check if retention policy is enabled retQuery := fmt.Sprintf( "SELECT count(*) > 0 as has_retention FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_retention'", ht.TableName, ) retResult, err := t.ExecuteSQLWithoutParams(ctx, retQuery) if err == nil { if retRows, ok := retResult.([]map[string]interface{}); ok && len(retRows) > 0 { if hasRetention, ok := retRows[0]["has_retention"].(bool); ok { ht.RetentionEnabled = hasRetention } } } hypertables = append(hypertables, ht) } return hypertables, nil } // GetHypertable gets information about a specific hypertable func (t *DB) GetHypertable(ctx context.Context, tableName string) (*Hypertable, error) { if !t.isTimescaleDB { return nil, fmt.Errorf("TimescaleDB extension not available") } query := fmt.Sprintf(` SELECT h.table_name, h.schema_name, d.column_name as time_column, count(d.id) as num_dimensions, ( SELECT column_name FROM _timescaledb_catalog.dimension WHERE hypertable_id = h.id AND column_type != 'TIMESTAMP' AND column_type != 'TIMESTAMPTZ' LIMIT 1 ) as space_column FROM _timescaledb_catalog.hypertable h JOIN _timescaledb_catalog.dimension d ON h.id = d.hypertable_id WHERE h.table_name = '%s' GROUP BY h.id, h.table_name, h.schema_name `, tableName) result, err := t.ExecuteSQLWithoutParams(ctx, query) if err != nil { return nil, fmt.Errorf("failed to get hypertable information: %w", err) } rows, ok := result.([]map[string]interface{}) if !ok || len(rows) == 0 { return nil, fmt.Errorf("table '%s' is not a hypertable", tableName) } row := rows[0] ht := &Hypertable{ TableName: fmt.Sprintf("%v", row["table_name"]), SchemaName: fmt.Sprintf("%v", row["schema_name"]), TimeColumn: fmt.Sprintf("%v", row["time_column"]), } // Handle nullable columns if row["space_column"] != nil { ht.SpaceColumn = fmt.Sprintf("%v", row["space_column"]) } // Convert numeric dimensions if dimensions, ok := row["num_dimensions"].(int64); ok { ht.NumDimensions = int(dimensions) } else if dimensions, ok := row["num_dimensions"].(int); ok { ht.NumDimensions = dimensions } // Check if compression is enabled compQuery := fmt.Sprintf( "SELECT count(*) > 0 as is_compressed FROM timescaledb_information.compression_settings WHERE hypertable_name = '%s'", ht.TableName, ) compResult, err := t.ExecuteSQLWithoutParams(ctx, compQuery) if err == nil { if compRows, ok := compResult.([]map[string]interface{}); ok && len(compRows) > 0 { if isCompressed, ok := compRows[0]["is_compressed"].(bool); ok { ht.CompressionEnabled = isCompressed } } } // Check if retention policy is enabled retQuery := fmt.Sprintf( "SELECT count(*) > 0 as has_retention FROM timescaledb_information.jobs WHERE hypertable_name = '%s' AND proc_name = 'policy_retention'", ht.TableName, ) retResult, err := t.ExecuteSQLWithoutParams(ctx, retQuery) if err == nil { if retRows, ok := retResult.([]map[string]interface{}); ok && len(retRows) > 0 { if hasRetention, ok := retRows[0]["has_retention"].(bool); ok { ht.RetentionEnabled = hasRetention } } } return ht, nil } // DropHypertable drops a hypertable func (t *DB) DropHypertable(ctx context.Context, tableName string, cascade bool) error { if !t.isTimescaleDB { return fmt.Errorf("TimescaleDB extension not available") } // Build the DROP TABLE query query := fmt.Sprintf("DROP TABLE %s", tableName) if cascade { query += " CASCADE" } _, err := t.ExecuteSQLWithoutParams(ctx, query) if err != nil { return fmt.Errorf("failed to drop hypertable: %w", err) } return nil } // CheckIfHypertable checks if a table is a hypertable func (t *DB) CheckIfHypertable(ctx context.Context, tableName string) (bool, error) { if !t.isTimescaleDB { return false, fmt.Errorf("TimescaleDB extension not available") } query := fmt.Sprintf(` SELECT EXISTS ( SELECT 1 FROM _timescaledb_catalog.hypertable WHERE table_name = '%s' ) as is_hypertable `, tableName) result, err := t.ExecuteSQLWithoutParams(ctx, query) if err != nil { return false, fmt.Errorf("failed to check if table is a hypertable: %w", err) } rows, ok := result.([]map[string]interface{}) if !ok || len(rows) == 0 { return false, fmt.Errorf("unexpected result from database query") } isHypertable, ok := rows[0]["is_hypertable"].(bool) if !ok { return false, fmt.Errorf("unexpected result type for is_hypertable") } return isHypertable, nil } // RecentChunks returns information about the most recent chunks func (t *DB) RecentChunks(ctx context.Context, tableName string, limit int) (interface{}, error) { if !t.isTimescaleDB { return nil, fmt.Errorf("TimescaleDB extension not available") } // Use default limit of 10 if not specified if limit <= 0 { limit = 10 } query := fmt.Sprintf(` SELECT chunk_name, range_start, range_end FROM timescaledb_information.chunks WHERE hypertable_name = '%s' ORDER BY range_end DESC LIMIT %d `, tableName, limit) result, err := t.ExecuteSQLWithoutParams(ctx, query) if err != nil { return nil, fmt.Errorf("failed to get recent chunks: %w", err) } return result, nil } // CreateHypertable is a helper function to create a hypertable with the given configuration options. // This is exported for use by other packages. func CreateHypertable(ctx context.Context, db *DB, table, timeColumn string, opts ...HypertableOption) error { config := HypertableConfig{ TableName: table, TimeColumn: timeColumn, } // Apply all options for _, opt := range opts { opt(&config) } return db.CreateHypertable(ctx, config) } // HypertableOption is a functional option for CreateHypertable type HypertableOption func(*HypertableConfig) // WithChunkInterval sets the chunk time interval func WithChunkInterval(interval string) HypertableOption { return func(config *HypertableConfig) { config.ChunkTimeInterval = interval } } // WithPartitioningColumn sets the space partitioning column func WithPartitioningColumn(column string) HypertableOption { return func(config *HypertableConfig) { config.PartitioningColumn = column } } // WithIfNotExists sets the if_not_exists flag func WithIfNotExists(ifNotExists bool) HypertableOption { return func(config *HypertableConfig) { config.IfNotExists = ifNotExists } } // WithMigrateData sets the migrate_data flag func WithMigrateData(migrateData bool) HypertableOption { return func(config *HypertableConfig) { config.MigrateData = migrateData } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FreePeak/db-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server