Skip to main content
Glama
FreePeak

Multi Database MCP Server

continuous_aggregate.go9.17 kB
package timescale import ( "context" "fmt" "strings" ) // ContinuousAggregateOptions encapsulates options for creating a continuous aggregate type ContinuousAggregateOptions struct { // Required parameters ViewName string // Name of the continuous aggregate view to create SourceTable string // Source table with raw data TimeColumn string // Time column to bucket BucketInterval string // Time bucket interval (e.g., '1 hour', '1 day') // Optional parameters Aggregations []ColumnAggregation // Aggregations to include in the view WhereCondition string // WHERE condition to filter source data WithData bool // Whether to materialize data immediately (WITH DATA) RefreshPolicy bool // Whether to add a refresh policy RefreshInterval string // Refresh interval (default: '1 day') RefreshLookback string // How far back to look when refreshing (default: '1 week') MaterializedOnly bool // Whether to materialize only (no real-time) CreateIfNotExists bool // Whether to use IF NOT EXISTS } // ContinuousAggregatePolicyOptions encapsulates options for refresh policies type ContinuousAggregatePolicyOptions struct { ViewName string // Name of the continuous aggregate view Start string // Start offset (e.g., '-2 days') End string // End offset (e.g., 'now()') ScheduleInterval string // Execution interval (e.g., '1 hour') } // CreateContinuousAggregate creates a new continuous aggregate view func (t *DB) CreateContinuousAggregate(ctx context.Context, options ContinuousAggregateOptions) error { if !t.isTimescaleDB { return fmt.Errorf("TimescaleDB extension not available") } var builder strings.Builder // Build CREATE MATERIALIZED VIEW statement builder.WriteString("CREATE MATERIALIZED VIEW ") // Add IF NOT EXISTS clause if requested if options.CreateIfNotExists { builder.WriteString("IF NOT EXISTS ") } // Add view name builder.WriteString(options.ViewName) builder.WriteString("\n") // Add WITH clause for materialized_only if requested if options.MaterializedOnly { builder.WriteString("WITH (timescaledb.materialized_only=true)\n") } // Start SELECT statement builder.WriteString("AS SELECT\n ") // Add time bucket builder.WriteString(fmt.Sprintf("time_bucket('%s', %s) as time_bucket", options.BucketInterval, options.TimeColumn)) // Add aggregations if len(options.Aggregations) > 0 { for _, agg := range options.Aggregations { colName := agg.Alias if colName == "" { colName = strings.ToLower(string(agg.Function)) + "_" + agg.Column } builder.WriteString(fmt.Sprintf(",\n %s(%s) as %s", agg.Function, agg.Column, colName)) } } else { // Default to count(*) if no aggregations specified builder.WriteString(",\n COUNT(*) as count") } // Add FROM clause builder.WriteString(fmt.Sprintf("\nFROM %s\n", options.SourceTable)) // Add WHERE clause if specified if options.WhereCondition != "" { builder.WriteString(fmt.Sprintf("WHERE %s\n", options.WhereCondition)) } // Add GROUP BY clause builder.WriteString("GROUP BY time_bucket\n") // Add WITH DATA or WITH NO DATA if options.WithData { builder.WriteString("WITH DATA") } else { builder.WriteString("WITH NO DATA") } // Execute the statement _, err := t.ExecuteSQLWithoutParams(ctx, builder.String()) if err != nil { return fmt.Errorf("failed to create continuous aggregate: %w", err) } // Add refresh policy if requested if options.RefreshPolicy { refreshInterval := options.RefreshInterval if refreshInterval == "" { refreshInterval = "1 day" } refreshLookback := options.RefreshLookback if refreshLookback == "" { refreshLookback = "1 week" } err = t.AddContinuousAggregatePolicy(ctx, ContinuousAggregatePolicyOptions{ ViewName: options.ViewName, Start: fmt.Sprintf("-%s", refreshLookback), End: "now()", ScheduleInterval: refreshInterval, }) if err != nil { return fmt.Errorf("created continuous aggregate but failed to add refresh policy: %w", err) } } return nil } // RefreshContinuousAggregate refreshes a continuous aggregate for a specific time range func (t *DB) RefreshContinuousAggregate(ctx context.Context, viewName, startTime, endTime string) error { if !t.isTimescaleDB { return fmt.Errorf("TimescaleDB extension not available") } var builder strings.Builder // Build CALL statement builder.WriteString("CALL refresh_continuous_aggregate(") // Add view name builder.WriteString(fmt.Sprintf("'%s'", viewName)) // Add time range if specified if startTime != "" && endTime != "" { builder.WriteString(fmt.Sprintf(", '%s'::timestamptz, '%s'::timestamptz", startTime, endTime)) } else { builder.WriteString(", NULL, NULL") } builder.WriteString(")") // Execute the statement _, err := t.ExecuteSQLWithoutParams(ctx, builder.String()) if err != nil { return fmt.Errorf("failed to refresh continuous aggregate: %w", err) } return nil } // AddContinuousAggregatePolicy adds a refresh policy to a continuous aggregate func (t *DB) AddContinuousAggregatePolicy(ctx context.Context, options ContinuousAggregatePolicyOptions) error { if !t.isTimescaleDB { return fmt.Errorf("TimescaleDB extension not available") } // Build policy creation SQL sql := fmt.Sprintf( "SELECT add_continuous_aggregate_policy('%s', start_offset => INTERVAL '%s', "+ "end_offset => INTERVAL '%s', schedule_interval => INTERVAL '%s')", options.ViewName, options.Start, options.End, options.ScheduleInterval, ) // Execute the statement _, err := t.ExecuteSQLWithoutParams(ctx, sql) if err != nil { return fmt.Errorf("failed to add continuous aggregate policy: %w", err) } return nil } // RemoveContinuousAggregatePolicy removes a refresh policy from a continuous aggregate func (t *DB) RemoveContinuousAggregatePolicy(ctx context.Context, viewName string) error { if !t.isTimescaleDB { return fmt.Errorf("TimescaleDB extension not available") } // Build policy removal SQL sql := fmt.Sprintf( "SELECT remove_continuous_aggregate_policy('%s')", viewName, ) // Execute the statement _, err := t.ExecuteSQLWithoutParams(ctx, sql) if err != nil { return fmt.Errorf("failed to remove continuous aggregate policy: %w", err) } return nil } // DropContinuousAggregate drops a continuous aggregate func (t *DB) DropContinuousAggregate(ctx context.Context, viewName string, cascade bool) error { if !t.isTimescaleDB { return fmt.Errorf("TimescaleDB extension not available") } var builder strings.Builder // Build DROP statement builder.WriteString(fmt.Sprintf("DROP MATERIALIZED VIEW %s", viewName)) // Add CASCADE if requested if cascade { builder.WriteString(" CASCADE") } // Execute the statement _, err := t.ExecuteSQLWithoutParams(ctx, builder.String()) if err != nil { return fmt.Errorf("failed to drop continuous aggregate: %w", err) } return nil } // GetContinuousAggregateInfo gets detailed information about a continuous aggregate func (t *DB) GetContinuousAggregateInfo(ctx context.Context, viewName string) (map[string]interface{}, error) { if !t.isTimescaleDB { return nil, fmt.Errorf("TimescaleDB extension not available") } // Query for continuous aggregate information query := fmt.Sprintf(` WITH policy_info AS ( SELECT ca.user_view_name, p.schedule_interval, p.start_offset, p.end_offset FROM timescaledb_information.continuous_aggregates ca LEFT JOIN timescaledb_information.jobs j ON j.hypertable_name = ca.user_view_name LEFT JOIN timescaledb_information.policies p ON p.job_id = j.job_id WHERE p.proc_name = 'policy_refresh_continuous_aggregate' AND ca.view_name = '%s' ), size_info AS ( SELECT pg_size_pretty(pg_total_relation_size(format('%%I.%%I', schemaname, tablename))) as view_size FROM pg_tables WHERE tablename = '%s' ) SELECT ca.view_name, ca.view_schema, ca.materialized_only, ca.view_definition, ca.refresh_lag, ca.refresh_interval, ca.hypertable_name, ca.hypertable_schema, pi.schedule_interval, pi.start_offset, pi.end_offset, si.view_size, ( SELECT min(time_bucket) FROM %s ) as min_time, ( SELECT max(time_bucket) FROM %s ) as max_time FROM timescaledb_information.continuous_aggregates ca LEFT JOIN policy_info pi ON pi.user_view_name = ca.user_view_name CROSS JOIN size_info si WHERE ca.view_name = '%s' `, viewName, viewName, viewName, viewName, viewName) // Execute query result, err := t.ExecuteSQLWithoutParams(ctx, query) if err != nil { return nil, fmt.Errorf("failed to get continuous aggregate info: %w", err) } // Convert result to map rows, ok := result.([]map[string]interface{}) if !ok || len(rows) == 0 { return nil, fmt.Errorf("continuous aggregate '%s' not found", viewName) } // Extract the first row info := rows[0] // Add computed fields info["has_policy"] = info["schedule_interval"] != nil return info, nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FreePeak/db-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server