Skip to main content
Glama
Aryan-Jhaveri

Canada's Food Guide MCP Server

cnf.py20.1 kB
import requests from bs4 import BeautifulSoup import json import time from typing import Optional, List, Dict, Any import logging # Configure logging with environment-based level control import os LOG_LEVEL = os.getenv('FOODGUIDE_LOG_LEVEL', 'ERROR') logging.basicConfig(level=getattr(logging, LOG_LEVEL)) #logger = logging.get#logger(__name__) # Core macronutrients for streamlined LLM-optimized analysis CORE_MACRONUTRIENTS = [ "Energy (kcal)", "Energy (kJ)", "Protein", "Total Fat", "Carbohydrate", "Fatty acids, saturated, total", "Fatty acids, monounsaturated, total", "Fatty acids, polyunsaturated, total", "Fatty acids, trans, total", "Dietary Fibre", "Sugars", "Sodium", "Cholesterol" ] class NutrientFileScraper: """ A class to scrape food nutrient information from the Canadian Nutrient File website. This class provides programmatic access to Health Canada's CNF database for searching foods and retrieving detailed nutrient profiles. Designed for MCP server integration with proper error handling and rate limiting. """ BASE_URL = "https://food-nutrition.canada.ca/cnf-fce" def __init__(self, rate_limit: float = 1.0): """ Initializes the scraper with a session object and standard browser headers. Args: rate_limit: Seconds to wait between requests to be respectful of the server """ self.session = requests.Session() self.session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }) self._csrf_token = None self.rate_limit = rate_limit self._last_request_time = 0 def _rate_limit_wait(self): """Ensure we don't exceed rate limits.""" current_time = time.time() time_since_last = current_time - self._last_request_time if time_since_last < self.rate_limit: sleep_time = self.rate_limit - time_since_last time.sleep(sleep_time) self._last_request_time = time.time() def _get_csrf_token(self, soup: BeautifulSoup) -> bool: """Helper to find and store CSRF token from a BeautifulSoup object.""" try: csrf_tag = soup.find('input', {'name': '_csrf'}) if csrf_tag and csrf_tag.has_attr('value'): self._csrf_token = csrf_tag['value'] return True return False except Exception as e: #logger.error(f"Error extracting CSRF token: {e}") return False def search_food(self, food_name: str) -> Optional[List[Dict[str, str]]]: """ Searches for a food by name in the CNF database. Args: food_name: Name of the food to search for (e.g., 'potatoes', 'honey') Returns: List of dictionaries with 'food_code' and 'food_name' keys, or None if error """ if not food_name or not food_name.strip(): #logger.error("Food name cannot be empty") return None search_page_url = f"{self.BASE_URL}/newSearch" try: self._rate_limit_wait() #logger.info(f"Searching CNF for: '{food_name}'") # Get search page to obtain CSRF token get_response = self.session.get(search_page_url) get_response.raise_for_status() soup = BeautifulSoup(get_response.text, 'html.parser') if not self._get_csrf_token(soup): #logger.error("Could not find CSRF token on search page") return None # Submit search request with DataTables parameters to get all results payload = { "foodName": food_name.strip(), "foodId": "", "_csrf": self._csrf_token, # DataTables parameters to show all results (not just default 10-25) "draw": "1", "start": "0", "length": "-1", # -1 means "All" in DataTables dropdown "search[value]": "", "search[regex]": "false" } self._rate_limit_wait() post_response = self.session.post(f"{self.BASE_URL}/doSearch", data=payload) post_response.raise_for_status() # Parse search results results_soup = BeautifulSoup(post_response.text, 'html.parser') results = [] for row in results_soup.find_all('tr'): cells = row.find_all('td') if len(cells) == 2 and cells[0].find('a'): food_code = cells[0].find('a').text.strip() food_name_text = cells[1].text.strip() results.append({ "food_code": food_code, "food_name": food_name_text }) # Update CSRF token for subsequent requests self._get_csrf_token(results_soup) #logger.info(f"Found {len(results)} food matches") return results except requests.exceptions.RequestException as e: #logger.error(f"Network error during food search: {e}") return None except Exception as e: #logger.error(f"Unexpected error during food search: {e}") return None def get_serving_info(self, food_code: str) -> tuple[Optional[Dict[str, str]], Optional[str]]: """ Retrieves available serving sizes and refuse information for a given food code. Args: food_code: CNF food code (e.g., '4941') Returns: Tuple of (serving_options_dict, refuse_info_string), or (None, None) if error """ if not food_code or not food_code.strip(): #logger.error("Food code cannot be empty") return None, None serving_page_url = f"{self.BASE_URL}/serving-portion?id={food_code}" try: self._rate_limit_wait() #logger.info(f"Getting serving info for food code: {food_code}") response = self.session.get(serving_page_url) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') if not self._get_csrf_token(soup): #logger.error("Could not find CSRF token on serving info page") return None, None # Extract ALL serving options (including unchecked ones) serving_options = {} # Look for the fieldset containing serving size options fieldset = soup.find('fieldset') if fieldset: # Find all serving size input checkboxes within the fieldset for opt in fieldset.find_all('input', {'name': 'selectedItems'}): if opt.has_attr('value') and opt.has_attr('id'): # Find the associated label using the 'for' attribute label = soup.find('label', {'for': opt['id']}) if label: serving_options[opt['value']] = { 'description': label.text.strip(), 'checked': opt.has_attr('checked'), 'value_id': opt['value'] } # Fallback to original method if fieldset approach doesn't work if not serving_options: for opt in soup.find_all('input', {'name': 'selectedItems'}): if opt.has_attr('value'): label = opt.find_next('label') if label: serving_options[opt['value']] = label.text.strip() # Extract refuse information refuse_info = "Not found" refuse_div = soup.find('div', class_='well well-sm') if refuse_div and 'Refuse:' in refuse_div.text: refuse_info = ' '.join(refuse_div.text.strip().split()) #logger.info(f"Found {len(serving_options)} serving options") return serving_options, refuse_info except requests.exceptions.RequestException as e: #logger.error(f"Network error getting serving info: {e}") return None, None except Exception as e: #logger.error(f"Unexpected error getting serving info: {e}") return None, None def get_nutrient_profile( self, food_code: str, serving_options: Dict[str, str], nutrient_filter: str = "all", preferred_units: Optional[List[str]] = None ) -> Optional[Dict[str, Any]]: """ Submits the form to generate the nutrient profile and scrapes the resulting table. Enhanced with EER-style filtering for LLM efficiency. Args: food_code: CNF food code serving_options: Dictionary of serving options from get_serving_info() nutrient_filter: "all" (default) or "macronutrients" for 91% data reduction preferred_units: List of units to filter (e.g. ["100g", "15ml", "tsp"]) Returns: Dictionary with nutrient data (filtered if requested) and metadata, or None if error """ if not food_code or not serving_options: #logger.error("Food code and serving options are required") return None report_url = f"{self.BASE_URL}/report-rapport" # Handle both old format (strings) and new format (dict with metadata) serving_keys = [] for key, value in serving_options.items(): if isinstance(value, dict): # New format with metadata serving_keys.append(key) else: # Old format (string values) serving_keys.append(key) payload = { "foodId": food_code, "selectedItems": serving_keys, # Select all available servings "_csrf": self._csrf_token } try: self._rate_limit_wait() #logger.info(f"Getting nutrient profile for food code: {food_code}") response = self.session.post(report_url, data=payload) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Find the nutrient report table table = soup.find('table', id='nutrReport') if not table: #logger.error("Nutrient report table not found") return {"error": "Nutrient report table not found"} # Parse the complex nutrient table structure headers = [] thead = table.find('thead') if thead: headers = [ th.text.strip().replace('\n', ' ').replace('\r', '').replace(' ', ' ') for th in thead.find_all('th') ] nutrient_data = {} current_group = "General" # Process table body sections for body in table.find_all('tbody'): for row in body.find_all('tr'): if 'active' in row.get('class', []): # This is a category header row current_group = row.th.text.strip() if row.th else "Unknown" if current_group not in nutrient_data: nutrient_data[current_group] = [] else: # This is a nutrient data row cols = row.find_all(['th', 'td']) if cols: nutrient_entry = {} for i, col in enumerate(cols): if i < len(headers): nutrient_entry[headers[i]] = col.text.strip() if current_group not in nutrient_data: nutrient_data[current_group] = [] nutrient_data[current_group].append(nutrient_entry) #logger.info(f"Successfully parsed nutrient profile with {len(nutrient_data)} categories") # Apply post-fetch filtering (EER-style approach) original_data = nutrient_data.copy() if nutrient_filter == "macronutrients": nutrient_data = self._filter_macronutrients_only(nutrient_data) if preferred_units: nutrient_data = self._filter_serving_units(nutrient_data, preferred_units) # Return with EER-style metadata return { "nutrient_data": nutrient_data, "filter_applied": nutrient_filter, "total_nutrients_found": self._count_nutrients(original_data), "filtered_nutrients_count": self._count_nutrients(nutrient_data), "serving_units_filtered": preferred_units or "all", "data_reduction_percentage": self._calculate_reduction_percentage(original_data, nutrient_data) } except requests.exceptions.RequestException as e: #logger.error(f"Network error generating nutrient profile: {e}") return None except Exception as e: #logger.error(f"Unexpected error generating nutrient profile: {e}") return None def get_complete_food_profile(self, food_code: str) -> Optional[Dict[str, Any]]: """ Convenience method to get complete food information in one call. Args: food_code: CNF food code Returns: Dictionary with serving info and complete nutrient profile, or None if error """ serving_options, refuse_info = self.get_serving_info(food_code) if not serving_options: #logger.error(f"Could not get serving info for food code: {food_code}") return None nutrient_profile = self.get_nutrient_profile(food_code, serving_options) if not nutrient_profile: #logger.error(f"Could not get nutrient profile for food code: {food_code}") return None return { "food_code": food_code, "serving_options": serving_options, "refuse_info": refuse_info, "nutrient_profile": nutrient_profile } def search_and_get_profile(self, food_name: str, food_index: int = 0) -> Optional[Dict[str, Any]]: """ Convenience method to search for a food and get the complete profile for the first match. Args: food_name: Name of food to search for food_index: Index of search result to use (default: 0 for first match) Returns: Complete food profile dictionary, or None if error """ search_results = self.search_food(food_name) if not search_results or len(search_results) <= food_index: #logger.error(f"No food found at index {food_index} for search: {food_name}") return None selected_food = search_results[food_index] food_code = selected_food['food_code'] #logger.info(f"Selected food: {selected_food['food_name']} (Code: {food_code})") profile = self.get_complete_food_profile(food_code) if profile: profile['selected_food'] = selected_food profile['search_results'] = search_results return profile def _filter_macronutrients_only(self, nutrient_data: Dict[str, Any]) -> Dict[str, Any]: """ Filter nutrient data to include only core macronutrients - 91% data reduction. Args: nutrient_data: Complete nutrient data organized by category Returns: Filtered nutrient data containing only macronutrients """ filtered_data = {} for category_name, nutrients in nutrient_data.items(): if not isinstance(nutrients, list): continue filtered_nutrients = [] for nutrient in nutrients: if not isinstance(nutrient, dict): continue nutrient_name = nutrient.get('Nutrient name', '').strip() # Check if this nutrient is in our core macronutrients list if nutrient_name in CORE_MACRONUTRIENTS: filtered_nutrients.append(nutrient) # Only include categories that have macronutrients if filtered_nutrients: filtered_data[category_name] = filtered_nutrients return filtered_data def _filter_serving_units(self, nutrient_data: Dict[str, Any], preferred_units: List[str]) -> Dict[str, Any]: """ Filter nutrient data to include only specified serving units. Args: nutrient_data: Nutrient data organized by category preferred_units: List of preferred units (e.g. ["100g", "15ml", "tsp"]) Returns: Filtered nutrient data with only preferred serving units """ filtered_data = {} for category_name, nutrients in nutrient_data.items(): if not isinstance(nutrients, list): continue filtered_nutrients = [] for nutrient in nutrients: if not isinstance(nutrient, dict): continue # Create a filtered nutrient entry with only preferred serving columns filtered_nutrient = {} # Always keep the nutrient name and unit for key, value in nutrient.items(): if key in ['Nutrient name', 'Unit see footnote1']: filtered_nutrient[key] = value elif key == 'Value per 100 g of edible portion': # Always keep the baseline 100g value filtered_nutrient[key] = value else: # Check if this column matches any of our preferred units for unit in preferred_units: if unit.lower() in key.lower(): filtered_nutrient[key] = value break if filtered_nutrient: filtered_nutrients.append(filtered_nutrient) if filtered_nutrients: filtered_data[category_name] = filtered_nutrients return filtered_data def _count_nutrients(self, nutrient_data: Dict[str, Any]) -> int: """Count total number of nutrients in the data.""" count = 0 for category_name, nutrients in nutrient_data.items(): if isinstance(nutrients, list): count += len(nutrients) return count def _calculate_reduction_percentage(self, original_data: Dict[str, Any], filtered_data: Dict[str, Any]) -> float: """Calculate the percentage reduction in data size.""" original_count = self._count_nutrients(original_data) filtered_count = self._count_nutrients(filtered_data) if original_count == 0: return 0.0 reduction = ((original_count - filtered_count) / original_count) * 100 return round(reduction, 1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Aryan-Jhaveri/mcp-foodguidecanada'

If you have feedback or need assistance with the MCP directory API, please join our Discord server