scraper.py•6.57 kB
"""Web scraper for grocery stores."""
import asyncio
import re
from typing import List, Optional, Dict, Any
import httpx
from bs4 import BeautifulSoup
from rapidfuzz import fuzz
import logging
from .models import GroceryProduct
logger = logging.getLogger(__name__)
class GroceryScraper:
"""Base class for grocery store scrapers."""
def __init__(self):
self.session = None
self.rate_limit_delay = 1.0 # 1 second between requests
async def __aenter__(self):
self.session = httpx.AsyncClient(
timeout=30.0,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.aclose()
async def search(self, query: str) -> List[GroceryProduct]:
"""Search for products matching the query."""
raise NotImplementedError
def _extract_price(self, price_text: str) -> Optional[float]:
"""Extract price from text."""
if not price_text:
return None
# Look for price patterns like $3.99, $12.49, etc.
price_match = re.search(r'\$?(\d+\.?\d*)', price_text.replace(',', ''))
if price_match:
try:
return float(price_match.group(1))
except ValueError:
return None
return None
def _extract_nutrition_estimate(self, product_name: str, description: str = "") -> Dict[str, Optional[float]]:
"""Estimate nutrition based on product name and description."""
combined_text = f"{product_name} {description}".lower()
# Basic protein estimates based on common food types
protein_estimates = {
'protein bar': 15.0,
'greek yogurt': 10.0,
'chicken breast': 25.0,
'salmon': 22.0,
'tofu': 8.0,
'eggs': 6.0,
'milk': 8.0,
'cheese': 25.0,
'nuts': 20.0,
'beans': 15.0,
'lentils': 18.0,
}
calorie_estimates = {
'protein bar': 200,
'greek yogurt': 100,
'chicken breast': 165,
'salmon': 206,
'tofu': 70,
'eggs': 155,
'milk': 150,
'cheese': 400,
'nuts': 600,
'beans': 245,
'lentils': 230,
}
# Find best match
best_protein = None
best_calories = None
best_score = 0
for food_type in protein_estimates.keys():
score = fuzz.partial_ratio(food_type, combined_text)
if score > best_score and score > 70: # 70% similarity threshold
best_score = score
best_protein = protein_estimates[food_type]
best_calories = calorie_estimates.get(food_type)
return {
'protein_g': best_protein,
'calories': best_calories,
'macros_missing': best_protein is None
}
class TraderJoesScraper(GroceryScraper):
"""Scraper for Trader Joe's products."""
BASE_URL = "https://www.traderjoes.com"
SEARCH_URL = f"{BASE_URL}/home/search"
async def search(self, query: str) -> List[GroceryProduct]:
"""Search Trader Joe's for products."""
if not self.session:
raise RuntimeError("Scraper not initialized. Use async context manager.")
logger.info(f"Searching Trader Joe's for: {query}")
try:
# Use web search to find Trader Joe's products since their site may not have a direct API
search_query = f"site:traderjoes.com {query}"
products = await self._search_via_web(search_query, query)
# Sort by protein per dollar if available
products.sort(key=lambda p: p.protein_per_dollar or 0, reverse=True)
return products[:10] # Return top 10 results
except Exception as e:
logger.error(f"Error searching Trader Joe's: {e}")
return []
async def _search_via_web(self, search_query: str, original_query: str) -> List[GroceryProduct]:
"""Search via web search engines for Trader Joe's products."""
products = []
# For now, create some mock products based on common queries
# In a real implementation, you'd integrate with a web search API or scrape search results
mock_products = [
{
"name": f"Trader Joe's {original_query.title()}",
"brand": "Trader Joe's",
"price": 4.99,
"size": "12 oz",
"store": "trader_joes"
},
{
"name": f"Organic {original_query.title()}",
"brand": "Trader Joe's",
"price": 6.99,
"size": "16 oz",
"store": "trader_joes"
}
]
for product_data in mock_products:
# Estimate nutrition
nutrition = self._extract_nutrition_estimate(product_data["name"])
# Calculate protein per dollar
protein_per_dollar = None
if nutrition['protein_g'] and product_data.get('price'):
protein_per_dollar = nutrition['protein_g'] / product_data['price']
product = GroceryProduct(
name=product_data["name"],
brand=product_data.get("brand"),
price=product_data.get("price"),
size=product_data.get("size"),
protein_g=nutrition['protein_g'],
calories=nutrition['calories'],
store=product_data["store"],
protein_per_dollar=protein_per_dollar,
macros_missing=nutrition['macros_missing']
)
products.append(product)
await asyncio.sleep(self.rate_limit_delay) # Rate limiting
return products
class ScraperFactory:
"""Factory for creating appropriate scrapers."""
@staticmethod
def create_scraper(store: str) -> GroceryScraper:
"""Create a scraper for the specified store."""
if store == "trader_joes":
return TraderJoesScraper()
else:
raise ValueError(f"Unsupported store: {store}")