server.py•4.98 kB
"""MCP Server for Grocery Search Agent."""
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional, Sequence
import sys
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
Tool,
TextContent,
)
from .models import SearchRequest, SearchResponse
from .scraper import ScraperFactory
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create MCP server instance
server = Server("grocery-search-mcp")
@server.list_tools()
async def list_tools() -> ListToolsResult:
"""List available tools."""
return ListToolsResult(
tools=[
Tool(
name="GroceryPrices.search",
description="Search for grocery items with price and nutritional information from various stores",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Food or product name to search for"
},
"store": {
"type": "string",
"enum": ["trader_joes"],
"default": "trader_joes",
"description": "Store to search (currently only Trader Joe's supported)"
}
},
"required": ["query"]
}
)
]
)
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Handle tool calls."""
if name == "GroceryPrices.search":
return await handle_grocery_search(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
async def handle_grocery_search(arguments: Dict[str, Any]) -> CallToolResult:
"""Handle grocery search requests."""
try:
# Parse and validate request
request = SearchRequest(**arguments)
logger.info(f"Searching for '{request.query}' at {request.store}")
# Create scraper
scraper = ScraperFactory.create_scraper(request.store)
# Perform search
async with scraper:
products = await scraper.search(request.query)
# Build response
response = SearchResponse(
products=products,
query=request.query,
store=request.store,
total_found=len(products)
)
# Format response for MCP
response_text = _format_search_response(response)
return CallToolResult(
content=[TextContent(type="text", text=response_text)]
)
except Exception as e:
logger.error(f"Error in grocery search: {e}")
error_message = f"Error searching for grocery items: {str(e)}"
return CallToolResult(
content=[TextContent(type="text", text=error_message)],
isError=True
)
def _format_search_response(response: SearchResponse) -> str:
"""Format search response for human-readable output."""
if not response.products:
return f"No products found for '{response.query}' at {response.store.replace('_', ' ').title()}"
lines = [
f"Found {response.total_found} products for '{response.query}' at {response.store.replace('_', ' ').title()}:",
""
]
for i, product in enumerate(response.products, 1):
lines.append(f"{i}. **{product.name}**")
if product.brand:
lines.append(f" Brand: {product.brand}")
if product.price:
lines.append(f" Price: ${product.price:.2f}")
if product.size:
lines.append(f" Size: {product.size}")
if product.protein_g:
lines.append(f" Protein: {product.protein_g}g")
if product.calories:
lines.append(f" Calories: {product.calories}")
if product.protein_per_dollar:
lines.append(f" Protein per $: {product.protein_per_dollar:.1f}g")
if product.macros_missing:
lines.append(f" ⚠️ Nutritional info estimated")
lines.append("") # Empty line between products
if response.stale:
lines.append("⚠️ Some data may be stale")
return "\n".join(lines)
async def main():
"""Main entry point for the MCP server."""
logger.info("Starting Grocery Search MCP Server")
# Start the server
async with stdio_server() as streams:
await server.run(
streams[0], # stdin
streams[1], # stdout
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())