Skip to main content
Glama
i-dot-ai
by i-dot-ai
lambda_handler.py2.75 kB
import asyncio import logging import os from datetime import UTC, datetime, timedelta from typing import Any from parliament_mcp.cli import configure_logging, load_data from parliament_mcp.qdrant_helpers import get_async_qdrant_client from parliament_mcp.settings import ParliamentMCPSettings, settings # Configure logging logger = logging.getLogger(__name__) log_level = os.environ.get("LOG_LEVEL", "INFO") logger.setLevel(log_level) configure_logging(level=log_level) async def main(settings: ParliamentMCPSettings, from_date_str: str, to_date_str: str) -> None: """Main ingestion function that processes all data sources.""" logger.info("Ingesting Hansard data...") async with get_async_qdrant_client(settings) as qdrant_client: await load_data( qdrant_client=qdrant_client, settings=settings, source="hansard", from_date=from_date_str, to_date=to_date_str, ) logger.info("Hansard data ingestion complete.") logger.info("Ingesting Parliamentary Questions data...") await load_data( qdrant_client=qdrant_client, settings=settings, source="parliamentary-questions", from_date=from_date_str, to_date=to_date_str, ) logger.info("Parliamentary Questions data ingestion complete.") def handler(event: dict, _: Any) -> None: """ AWS Lambda handler function. This function is the entry point for the Lambda execution. It triggers the daily data ingestion for the Parliament MCP. Args: event (dict): Lambda event. Should be in the format: { "from_date": "2024-10-10", # Optional "to_date": "2024-10-12", # Optional } context (dict): Lambda context. Returns: None """ logger.info("Starting daily data ingestion...") try: utc_now = datetime.now(UTC) if "to_date" in event: to_date_str = event["to_date"] else: logger.info("No to_date provided, using default of today") to_date_str = utc_now.strftime("%Y-%m-%d") if "from_date" in event: from_date_str = event["from_date"] else: logger.info("No from_date provided, using default of 2 days ago") from_date_str = (utc_now - timedelta(days=2)).strftime("%Y-%m-%d") logger.info("Ingesting data from %s to %s", from_date_str, to_date_str) asyncio.run(main(settings, from_date_str, to_date_str)) logger.info("Daily data ingestion finished successfully.") except Exception: logger.exception("An error occurred during data ingestion") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/i-dot-ai/parliament-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server