We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/HatmanStack/RAGStack-Lambda'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Scrape Discover Lambda
Processes URLs from the discovery queue, extracts links, and adds
new discovered URLs back to the queue (recursive discovery).
Input event (SQS triggered):
{
"Records": [{
"body": "{\"job_id\": \"uuid\", \"url\": \"https://...\", \"depth\": 0}"
}]
}
Output:
{
"processed": 1,
"discovered": 5,
"skipped": 2
}
"""
import json
import logging
import os
import time
from datetime import UTC, datetime
import boto3
from botocore.exceptions import ClientError
from ragstack_common.appsync import publish_scrape_update
from ragstack_common.scraper import ScrapePage, ScrapeStatus, UrlStatus
from ragstack_common.scraper.discovery import (
extract_links,
filter_discovered_urls,
normalize_url,
)
from ragstack_common.scraper.fetcher import HttpFetcher
from ragstack_common.scraper.models import ScrapeConfig
logger = logging.getLogger()
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
def lambda_handler(event, context):
"""
Main Lambda handler - processes discovery queue messages.
"""
# Get environment variables
jobs_table = os.environ.get("SCRAPE_JOBS_TABLE")
urls_table = os.environ.get("SCRAPE_URLS_TABLE")
discovery_queue_url = os.environ.get("SCRAPE_DISCOVERY_QUEUE_URL")
processing_queue_url = os.environ.get("SCRAPE_PROCESSING_QUEUE_URL")
request_delay_ms = int(os.environ.get("REQUEST_DELAY_MS", "500"))
if not jobs_table:
raise ValueError("SCRAPE_JOBS_TABLE environment variable required")
if not urls_table:
raise ValueError("SCRAPE_URLS_TABLE environment variable required")
if not discovery_queue_url:
raise ValueError("SCRAPE_DISCOVERY_QUEUE_URL environment variable required")
if not processing_queue_url:
raise ValueError("SCRAPE_PROCESSING_QUEUE_URL environment variable required")
dynamodb = boto3.resource("dynamodb")
jobs_tbl = dynamodb.Table(jobs_table)
urls_tbl = dynamodb.Table(urls_table)
sqs = boto3.client("sqs")
processed = 0
discovered = 0
skipped = 0
batch_item_failures = []
# Process SQS records
for record in event.get("Records", []):
try:
message = json.loads(record["body"])
job_id = message["job_id"]
url = message["url"]
depth = message.get("depth", 0)
logger.info(f"Processing discovery: job={job_id}, url={url}, depth={depth}")
# Check if job is still active
job_response = jobs_tbl.get_item(Key={"job_id": job_id})
job_item = job_response.get("Item")
if not job_item:
logger.warning(f"Job not found: {job_id}")
continue
job_status = job_item.get("status")
if job_status in [
ScrapeStatus.CANCELLED.value,
ScrapeStatus.FAILED.value,
ScrapeStatus.COMPLETED.value,
]:
logger.info(f"Job {job_id} is {job_status}, skipping")
skipped += 1
continue
# Normalize URL for deduplication
normalized_url = normalize_url(url)
# Check if URL already visited
existing = urls_tbl.get_item(Key={"job_id": job_id, "url": normalized_url})
if existing.get("Item"):
logger.info(f"URL already visited: {normalized_url}")
skipped += 1
continue
# Create page record (mark as discovered)
page = ScrapePage(
job_id=job_id,
url=normalized_url,
status=UrlStatus.PENDING,
depth=depth,
)
page_data = page.to_dict()
urls_tbl.put_item(Item=page_data)
# Get job config
config_data = job_item.get("config", {})
config = ScrapeConfig.from_dict(config_data)
base_url = job_item.get("base_url", url)
# Fetch the page to extract links
fetcher = HttpFetcher(
delay_ms=request_delay_ms,
cookies=config.cookies,
headers=config.headers,
)
result = fetcher.fetch(normalized_url)
if result.error:
logger.warning(f"Fetch failed during discovery: {normalized_url} - {result.error}")
# Mark URL as failed but continue - processing will retry
urls_tbl.update_item(
Key={"job_id": job_id, "url": normalized_url},
UpdateExpression="SET #status = :status, #error = :err",
ExpressionAttributeNames={"#status": "status", "#error": "error"},
ExpressionAttributeValues={
":status": UrlStatus.FAILED.value,
":err": result.error,
},
)
# Update job failed count
jobs_tbl.update_item(
Key={"job_id": job_id},
UpdateExpression="SET failed_count = failed_count + :inc, updated_at = :ts",
ExpressionAttributeValues={
":inc": 1,
":ts": datetime.now(UTC).isoformat(),
},
)
continue
# Send URL to processing queue
sqs.send_message(
QueueUrl=processing_queue_url,
MessageBody=json.dumps(
{
"job_id": job_id,
"url": normalized_url,
"depth": depth,
}
),
)
# Update job total URLs count
jobs_tbl.update_item(
Key={"job_id": job_id},
UpdateExpression="SET total_urls = total_urls + :inc, updated_at = :ts",
ExpressionAttributeValues={
":inc": 1,
":ts": datetime.now(UTC).isoformat(),
},
)
# Publish discovery progress update to subscribers
graphql_endpoint = os.environ.get("GRAPHQL_ENDPOINT")
current_total = int(job_item.get("total_urls", 0)) + 1
publish_scrape_update(
graphql_endpoint=graphql_endpoint,
job_id=job_id,
base_url=job_item.get("base_url", url),
title=job_item.get("title") or job_item.get("base_url", url),
status=job_item.get("status", ScrapeStatus.DISCOVERING.value),
total_urls=current_total,
processed_count=int(job_item.get("processed_count", 0)),
failed_count=int(job_item.get("failed_count", 0)),
)
# Extract and filter links if within depth limit
max_depth = config.max_depth
max_pages = config.max_pages
if depth < max_depth and result.is_html:
links = extract_links(result.content, normalized_url)
# Track URLs we've already seen (including current URL)
# DynamoDB provides cross-invocation dedup; this handles within-batch
visited = {normalized_url}
filtered = filter_discovered_urls(
urls=links,
base_url=base_url,
config=config,
visited=visited,
)
logger.info(f"Discovered {len(filtered)} new URLs from {normalized_url}")
# Check max pages limit
job_refresh = jobs_tbl.get_item(Key={"job_id": job_id})
total_discovered = int(job_refresh.get("Item", {}).get("total_urls", 0))
remaining = max_pages - total_discovered
# Queue new URLs for discovery
urls_to_queue = filtered[:remaining] if remaining > 0 else []
for link in urls_to_queue:
sqs.send_message(
QueueUrl=discovery_queue_url,
MessageBody=json.dumps(
{
"job_id": job_id,
"url": link,
"depth": depth + 1,
}
),
)
discovered += 1
if remaining <= 0:
logger.info(f"Max pages limit ({max_pages}) reached for job {job_id}")
processed += 1
# Respect rate limit between pages
time.sleep(request_delay_ms / 1000.0)
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
logger.error(f"AWS error processing record: {error_code} - {e}")
batch_item_failures.append({"itemIdentifier": record["messageId"]})
except Exception as e:
logger.error(f"Error processing record: {e}", exc_info=True)
batch_item_failures.append({"itemIdentifier": record["messageId"]})
return {
"processed": processed,
"discovered": discovered,
"skipped": skipped,
"batchItemFailures": batch_item_failures,
}