Skip to main content
Glama
Aryan-Jhaveri

Canada's Food Guide MCP Server

sql_engine.py13.3 kB
""" Simple SQL query engine for virtual nutrition tables. This module provides a lightweight SQL-like query engine that operates on the virtual session table data structures, enabling LLMs to write SQL queries for nutrition analysis without requiring a full database. """ import re import pandas as pd from typing import Dict, List, Any, Optional import logging #logger = logging.getLogger(__name__) class VirtualSQLEngine: """ Simple SQL query engine for virtual nutrition tables. Supports: - SELECT with column expressions and calculations - FROM with table names - JOIN operations (INNER, LEFT, RIGHT) - WHERE clauses with basic conditions - GROUP BY and aggregate functions (SUM, AVG, COUNT, etc.) - ORDER BY clauses - Basic CASE expressions for unit conversion """ def __init__(self, session_data: Dict[str, Any]): self.session_data = session_data self.tables = self._prepare_tables() def _prepare_tables(self) -> Dict[str, pd.DataFrame]: """Convert virtual session data to pandas DataFrames for SQL processing.""" tables = {} # Convert recipe_ingredients table if 'recipe_ingredients' in self.session_data: tables['recipe_ingredients'] = pd.DataFrame(self.session_data['recipe_ingredients']) # Convert cnf_foods table if 'cnf_foods' in self.session_data: tables['cnf_foods'] = pd.DataFrame(self.session_data['cnf_foods']) # Convert cnf_nutrients table if 'cnf_nutrients' in self.session_data: tables['cnf_nutrients'] = pd.DataFrame(self.session_data['cnf_nutrients']) # Convert recipes table (metadata) if 'recipes' in self.session_data: recipes_list = list(self.session_data['recipes'].values()) if recipes_list: tables['recipes'] = pd.DataFrame(recipes_list) return tables def execute(self, query: str) -> Dict[str, Any]: """ Execute a SQL-like query on the virtual tables. Args: query: SQL query string Returns: Dict with query results or error information """ try: # Normalize query query = query.strip() if not query.upper().startswith('SELECT'): return {"error": "Only SELECT queries are supported"} # Parse and execute query using pandas result_df = self._execute_select(query) # Convert result to dict format if result_df is not None: return { "success": "Query executed successfully", "rows": len(result_df), "columns": list(result_df.columns), "data": result_df.to_dict('records') } else: return {"error": "Query execution failed"} except Exception as e: #logger.error(f"SQL query execution error: {e}") return {"error": f"Query execution failed: {str(e)}"} def _execute_select(self, query: str) -> Optional[pd.DataFrame]: """Execute a SELECT query using pandas operations.""" try: # Simple query parsing - this is a basic implementation # For production, would use a proper SQL parser # Extract table name from FROM clause from_match = re.search(r'FROM\s+(\w+)', query, re.IGNORECASE) if not from_match: raise ValueError("No FROM clause found") main_table = from_match.group(1) if main_table not in self.tables: raise ValueError(f"Table '{main_table}' not found") result_df = self.tables[main_table].copy() # Handle JOINs result_df = self._process_joins(query, result_df) # Handle WHERE clause result_df = self._process_where(query, result_df) # Handle SELECT columns and calculations result_df = self._process_select(query, result_df) # Handle GROUP BY result_df = self._process_group_by(query, result_df) # Handle ORDER BY result_df = self._process_order_by(query, result_df) return result_df except Exception as e: #logger.error(f"SELECT query processing error: {e}") raise def _process_joins(self, query: str, df: pd.DataFrame) -> pd.DataFrame: """Process JOIN clauses in the query.""" # Find all JOIN clauses join_pattern = r'(LEFT\s+JOIN|RIGHT\s+JOIN|INNER\s+JOIN|JOIN)\s+(\w+)\s+ON\s+([\w\s.=]+)' joins = re.findall(join_pattern, query, re.IGNORECASE) for join_type, table_name, condition in joins: if table_name not in self.tables: continue join_table = self.tables[table_name] # Parse the ON condition (simplified - assumes format: table1.col = table2.col) condition_parts = condition.strip().split('=') if len(condition_parts) == 2: left_col = condition_parts[0].strip().split('.')[-1] # Get column name right_col = condition_parts[1].strip().split('.')[-1] # Determine join type how = 'inner' if 'LEFT' in join_type.upper(): how = 'left' elif 'RIGHT' in join_type.upper(): how = 'right' # Perform the join df = df.merge(join_table, left_on=left_col, right_on=right_col, how=how, suffixes=('', '_joined')) return df def _process_where(self, query: str, df: pd.DataFrame) -> pd.DataFrame: """Process WHERE clause in the query.""" where_match = re.search(r'WHERE\s+(.+?)(?:GROUP\s+BY|ORDER\s+BY|$)', query, re.IGNORECASE | re.DOTALL) if not where_match: return df where_clause = where_match.group(1).strip() # Simple WHERE processing - handle basic conditions # This is simplified - would need more robust parsing for production if '=' in where_clause: parts = where_clause.split('=') if len(parts) == 2: column = parts[0].strip().replace("'", "").replace('"', '') value = parts[1].strip().replace("'", "").replace('"', '') # Handle column references (remove table prefixes) if '.' in column: column = column.split('.')[-1] if column in df.columns: df = df[df[column] == value] return df def _process_select(self, query: str, df: pd.DataFrame) -> pd.DataFrame: """Process SELECT clause and column calculations.""" # Extract SELECT clause select_match = re.search(r'SELECT\s+(.+?)\s+FROM', query, re.IGNORECASE | re.DOTALL) if not select_match: return df select_clause = select_match.group(1).strip() # Handle SELECT * if select_clause == '*': return df # Parse selected columns/expressions columns = [col.strip() for col in select_clause.split(',')] result_columns = {} for col_expr in columns: # Handle AS aliases if ' AS ' in col_expr.upper(): parts = re.split(r'\s+AS\s+', col_expr, flags=re.IGNORECASE) expression = parts[0].strip() alias = parts[1].strip() else: expression = col_expr.strip() alias = expression # Process the expression if self._is_simple_column(expression, df): # Simple column reference col_name = expression.split('.')[-1] if '.' in expression else expression if col_name in df.columns: result_columns[alias] = df[col_name] else: # Complex expression - try to evaluate try: result_columns[alias] = self._evaluate_expression(expression, df) except: # If evaluation fails, try to use as column name col_name = expression.split('.')[-1] if '.' in expression else expression if col_name in df.columns: result_columns[alias] = df[col_name] if result_columns: return pd.DataFrame(result_columns) else: return df def _process_group_by(self, query: str, df: pd.DataFrame) -> pd.DataFrame: """Process GROUP BY clause.""" group_match = re.search(r'GROUP\s+BY\s+([\w\s,]+)', query, re.IGNORECASE) if not group_match: return df group_cols = [col.strip() for col in group_match.group(1).split(',')] # Simple GROUP BY with aggregation if all(col in df.columns for col in group_cols): return df.groupby(group_cols).agg('sum').reset_index() return df def _process_order_by(self, query: str, df: pd.DataFrame) -> pd.DataFrame: """Process ORDER BY clause.""" order_match = re.search(r'ORDER\s+BY\s+([\w\s,]+)', query, re.IGNORECASE) if not order_match: return df order_cols = [col.strip() for col in order_match.group(1).split(',')] # Simple ORDER BY valid_cols = [col for col in order_cols if col in df.columns] if valid_cols: return df.sort_values(valid_cols) return df def _is_simple_column(self, expression: str, df: pd.DataFrame) -> bool: """Check if expression is a simple column reference.""" col_name = expression.split('.')[-1] if '.' in expression else expression return col_name in df.columns and not any(op in expression for op in ['+', '-', '*', '/', '(', ')']) def _evaluate_expression(self, expression: str, df: pd.DataFrame) -> pd.Series: """Evaluate complex expressions with calculations.""" # Simple expression evaluation for basic math operations # This is a simplified implementation # Handle SUM() function if expression.upper().startswith('SUM('): inner_expr = expression[4:-1] # Remove SUM( and ) inner_result = self._evaluate_expression(inner_expr, df) return pd.Series([inner_result.sum()]) # Handle basic arithmetic with columns # Replace column names with actual values for evaluation eval_expr = expression for col in df.columns: if col in eval_expr: eval_expr = eval_expr.replace(col, f'df["{col}"]') try: # Use eval for simple mathematical expressions # Note: This is simplified - production would need safer evaluation return eval(eval_expr) except: # If evaluation fails, return the original column or zeros return pd.Series([0] * len(df)) def get_available_tables_info() -> Dict[str, Any]: """Get information about available virtual tables and their schemas.""" return { "recipe_ingredients": { "description": "Recipe ingredient data with parsed amounts and units", "columns": [ "ingredient_id (PK)", "recipe_id (FK)", "ingredient_name", "amount (float)", "unit (string)", "ingredient_order (int)", "cnf_food_code (FK)" ], "example": "SELECT ingredient_name, amount, unit FROM recipe_ingredients WHERE recipe_id = 'recipe1'" }, "cnf_foods": { "description": "CNF food descriptions and metadata", "columns": [ "cnf_food_code (PK)", "food_description" ], "example": "SELECT * FROM cnf_foods WHERE food_description LIKE '%honey%'" }, "cnf_nutrients": { "description": "CNF nutrient values for different serving sizes", "columns": [ "cnf_food_code (FK)", "nutrient_name", "nutrient_value (float)", "per_amount (float)", "unit (string)" ], "example": "SELECT * FROM cnf_nutrients WHERE nutrient_name = 'Energy (kcal)'" }, "recipes": { "description": "Recipe metadata and details", "columns": [ "recipe_id (PK)", "title", "base_servings (int)", "prep_time", "cook_time" ], "example": "SELECT title, base_servings FROM recipes" } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Aryan-Jhaveri/mcp-foodguidecanada'

If you have feedback or need assistance with the MCP directory API, please join our Discord server