Skip to main content
Glama

Tecton MCP Server

Official
by tecton-ai
generate_examples_parquet.py14.2 kB
#!/usr/bin/env python3 """ Generate examples parquet files from Tecton code repositories. This script replaces the outdated generate_examples_parquet.ipynb notebook that relied on the deprecated tecton_gen_ai library. It uses OpenAI's API directly to extract Tecton declarations from Python files. The script processes code from specified directories and generates two separate parquet files: - examples_spark.parquet: For Spark-based Tecton code - examples_rift.parquet: For Rift-based Tecton code """ import os import re import json import pandas as pd import tqdm from typing import List, Dict, Any, Tuple from pydantic import BaseModel, Field from openai import OpenAI from pathlib import Path from enum import Enum # Initialize OpenAI client client = OpenAI() class DirectoryType(Enum): RIFT = "rift" SPARK = "spark" class DirectoryConfig: def __init__(self, path: str, directory_type: DirectoryType): self.path = os.path.expanduser(path) self.type = directory_type def get_py_files(directory: str) -> List[str]: """Recursively find all Python files in a directory.""" files = [] for root, dirs, filenames in os.walk(directory): for filename in filenames: if filename.endswith('.py'): files.append(os.path.join(root, filename)) return files def extract_declarations_from_code(code: str) -> List[Tuple[str, str]]: """Extract Tecton declarations from Python code using OpenAI.""" try: response = client.chat.completions.create( model="gpt-4.1", messages=[ { "role": "system", "content": """Extract Tecton-related declarations from the code, as a list of tuples of declarations. Each tuple contains the object/function name and the description. YOU SHOULD EXTRACT: - Tecton classes and decorated functions (Entity, FeatureView, FeatureService, etc.) - Tecton objects embedded in other objects (SnowflakeConfig, Attribute, Aggregate, etc.) - Unit tests (describe explcitly as "Unit test" and then describe what it's testing) Pay attention to import statements to identify Tecton objects. Don't extract declarations that are commented out with # comments. CRITICAL REQUIREMENTS FOR DESCRIPTIONS: 1. ALWAYS use comments above the declaration 2. ALWAYS use the actual `description` field from the code (e.g., FeatureView.description, Entity.description) 3. If neither description field nor comments exist, infer the business purpose from context and variable names 4. Focus on WHAT the component does for the business, in addition to technical details EXAMPLES OF GOOD vs BAD descriptions: GOOD: - "Article interactions: aggregations of clicks, carts, orders on an article" (uses actual description field) - "Unique sessions with article interactions over the past 30 days" (business meaning) - "Distance in kilometers between transaction and user's home location, used for fraud detection" - "Transaction request source schema" (from comments) BAD: - "article_sessions" (just the function name) - "transactions_batch" (just the variable name) - "ad_impressions_batch" (just the variable name) For each declaration type, use the description parameter value AND comments if they exist, then also infer purpose from context Focus on extracting components that would be useful for someone learning Tecton or implementing similar features. MORE EXAMPLES: For example, with this code: ```from tecton import Entity, FeatureTable, Attribute from tecton.types import String, Timestamp, Int64, Field from fraud.entities import user from datetime import timedelta features = [ Attribute('user_login_count_7d', Int64), Attribute('user_login_count_30d', Int64), ] user_login_counts = FeatureTable( name='user_login_counts', entities=[user], features=features, online=True, offline=True, ttl=timedelta(days=7), owner='demo-user@tecton.ai', tags={'release': 'production'}, description='User login counts over time.', timestamp_field='timestamp' ) ``` The declarations would be: [("FeatureTable", "User login counts over time.")] In this code ```python fraud_detection_feature_service = FeatureService( name='fraud_detection_feature_service', prevent_destroy=False, # Set to True for production services to prevent accidental destructive changes or downtime. features=[ transaction_amount_is_higher_than_average, user_transaction_amount_metrics, user_transaction_counts, user_distinct_merchant_transaction_count_30d, merchant_fraud_rate ] ) minimal_fs = FeatureService( name='minimal_fs', features=[ transaction_amount_is_high ] ) ``` The declarations would be: [ ("FeatureService", "Fraud detection feature service"), ("FeatureService", "Whether transaction amount is higher") ] In this code: ``` import math from ads.features.on_demand_feature_views.user_query_embedding_similarity import user_query_embedding_similarity # Testing the 'user_query_embedding_similarity' feature which takes in request data ('query_embedding') # and a precomputed feature ('user_embedding') as inputs def test_user_query_embedding_similarity(): request = {'query_embedding': [1.0, 1.0, 0.0]} user_embedding = {'user_embedding': [0.0, 1.0, 1.0]} actual = user_query_embedding_similarity.test_run(request=request, user_embedding=user_embedding) # Float comparison. expected = 0.5 assert math.isclose(actual['cosine_similarity'], expected) ``` The declarations would be: [("test", "Testing the 'user_query_embedding_similarity' feature which takes in request data ('query_embedding') and a precomputed feature ('user_embedding') as inputs")] In this code ```python from tecton import BatchSource, SnowflakeConfig from tecton.types import Field, Int64, String, Timestamp, Array gaming_user_batch = BatchSource( name="gaming_users", batch_config=SnowflakeConfig( database="VINCE_DEMO_DB", schema="PUBLIC", table="ONLINE_GAMING_USERS", url="https://<your-cluster>.<your-snowflake-region>.snowflakecomputing.com/", warehouse="COMPUTE_WH", timestamp_field='TIMESTAMP', ), ) ``` (Pay attention that SnowflakeConfig is a configuration object embedded in the BatchSource object, we also need to extract that) The declarations would be: [("BatchSource", "Gaming users batch source"), ("SnowflakeConfig", "Gaming users batch source configuration")] In this code: ``` # The following defines several sliding time window aggregations over a user's transaction amounts @stream_feature_view( source=transactions_stream, entities=[user], mode='pandas', batch_schedule=timedelta(days=1), # Defines how frequently batch jobs are scheduled to ingest into the offline store features=[ Aggregate(input_column=Field('amt', Float64), function='sum', time_window=timedelta(hours=1)), Aggregate(input_column=Field('amt', Float64), function='max', time_window=timedelta(days=1)), Aggregate(input_column=Field('amt', Float64), function='min', time_window=timedelta(days=3)), Aggregate(input_column=Field('amt', Float64), function=approx_percentile(percentile=0.5, precision=100), time_window=timedelta(hours=1)) ], timestamp_field='timestamp', online=True, offline=True, feature_start_time=datetime(2022, 5, 1), tags={'release': 'production'}, owner='demo-user@tecton.ai', description='Transaction amount statistics and total over a series of time windows, updated every 10 minutes.', aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def user_transaction_amount_metrics(transactions): return transactions[['user_id', 'amt', 'timestamp']] ``` [("Aggregate", "sum of transaction amounts over the past hour"), ("Aggregate", "max of transaction amounts over the past day"), ("Aggregate", "min of transaction amounts over the past 3 days"), ("Aggregate", "50th percentile of transaction amounts over the past hour")] """ }, { "role": "user", "content": f"Extract meaningful Tecton declarations from this code. Use comments and description fields when available:\n\n{code}" } ], temperature=0, max_tokens=4000, timeout=45, response_format={ "type": "json_schema", "json_schema": { "name": "declarations", "schema": { "type": "object", "properties": { "declarations": { "type": "array", "items": { "type": "array", "items": {"type": "string"}, "minItems": 2, "maxItems": 2 } } }, "required": ["declarations"], "additionalProperties": False } } } ) result = json.loads(response.choices[0].message.content) declarations = result.get("declarations", []) # Return all declarations without validation filtering return declarations except Exception as e: print(f"Error extracting declarations: {e}") return [] def extract_declarations(directory_configs: List[DirectoryConfig]) -> List[Dict[str, Any]]: """Extract declarations from all Python files in the given directory configurations.""" files_with_types = [] for config in directory_configs: if os.path.exists(config.path): folder_files = get_py_files(config.path) for file_path in folder_files: files_with_types.append((file_path, config.type)) else: print(f"Warning: Directory {config.path} does not exist, skipping...") print(f"Found {len(files_with_types)} Python files to process") res = [] for i in tqdm.tqdm(range(len(files_with_types)), desc="Processing files"): file_path, directory_type = files_with_types[i] try: with open(file_path, 'r', encoding='utf-8') as f: code = f.read() # Skip empty files or files with very little content if len(code.strip()) < 50: continue declarations = extract_declarations_from_code(code) for declaration in declarations: if len(declaration) >= 2: res.append({ "text": f"Example of {declaration[0]}. {declaration[1]}", "code": code, "file_path": file_path, "directory_type": directory_type }) except Exception as e: print(f"Error processing file {file_path}: {e}") continue return res def main(): """Main function to generate examples parquet files.""" # Define the directories to process with their types directory_configs = [ DirectoryConfig("~/git/tecton-sample-repo/rift", DirectoryType.RIFT), DirectoryConfig("~/git/examples/Snowflake", DirectoryType.RIFT), DirectoryConfig("~/git/examples/Spark", DirectoryType.SPARK), DirectoryConfig("~/git/tecton-sample-repo/spark", DirectoryType.SPARK), ] print("Starting extraction of Tecton declarations...") print(f"Processing directories: {[config.path for config in directory_configs]}") # Extract all declarations with type information all_declarations = extract_declarations(directory_configs) if not all_declarations: print("No declarations found. Exiting.") return print(f"Extracted {len(all_declarations)} declarations") # Separate into Rift and Spark examples based on directory_type rift_examples = [] spark_examples = [] for declaration in all_declarations: example_data = { "text": declaration["text"], "code": declaration["code"] } if declaration["directory_type"] == DirectoryType.RIFT: rift_examples.append(example_data) else: spark_examples.append(example_data) print(f"Found {len(rift_examples)} Rift examples and {len(spark_examples)} Spark examples") # Create output directory output_dir = os.path.join(os.path.dirname(__file__), "..", "data") os.makedirs(output_dir, exist_ok=True) # Save Rift examples if rift_examples: rift_df = pd.DataFrame(rift_examples) rift_output_path = os.path.join(output_dir, "examples_rift.parquet") rift_df.to_parquet(rift_output_path, index=False) print(f"Saved {len(rift_examples)} Rift examples to {rift_output_path}") else: print("No Rift examples found") # Save Spark examples if spark_examples: spark_df = pd.DataFrame(spark_examples) spark_output_path = os.path.join(output_dir, "examples_spark.parquet") spark_df.to_parquet(spark_output_path, index=False) print(f"Saved {len(spark_examples)} Spark examples to {spark_output_path}") else: print("No Spark examples found") # Also save combined examples (for backward compatibility) combined_examples = rift_examples + spark_examples if combined_examples: combined_df = pd.DataFrame(combined_examples) combined_output_path = os.path.join(output_dir, "examples.parquet") combined_df.to_parquet(combined_output_path, index=False) print(f"Saved {len(combined_examples)} combined examples to {combined_output_path}") print("Example extraction completed successfully!") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tecton-ai/tecton-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server