"""
Scrape Process Lambda
Processes URLs from the processing queue, extracts content,
converts to markdown, and saves to S3 input bucket.
Input event (SQS triggered):
{
"Records": [{
"body": "{\"job_id\": \"uuid\", \"url\": \"https://...\", \"depth\": 0}"
}]
}
Output:
{
"processed": 1,
"failed": 0,
"skipped": 0
}
"""
import hashlib
import json
import logging
import os
import re
from datetime import UTC, datetime
from urllib.parse import urlparse
import boto3
from botocore.exceptions import ClientError
from ragstack_common.appsync import publish_scrape_update
from ragstack_common.scraper import ScrapeStatus, UrlStatus
from ragstack_common.scraper.dedup import DeduplicationService
from ragstack_common.scraper.extractor import extract_content
from ragstack_common.scraper.fetcher import fetch_auto
from ragstack_common.scraper.models import ScrapeConfig
# Metadata JSON file for Bedrock KB ingestion
METADATA_CONTENT_TYPE = "application/json"
logger = logging.getLogger()
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
# Module-level AWS clients (reused across warm invocations)
dynamodb = boto3.resource("dynamodb")
s3 = boto3.client("s3")
def generate_url_slug(url: str) -> str:
"""
Generate a unique slug from a URL for S3 key naming.
Combines sanitized path with a hash suffix for uniqueness.
Args:
url: The full URL to generate a slug from.
Returns:
A sanitized slug like "products_widgets_a1b2c3d4" or just hash if no path.
"""
parsed = urlparse(url)
path = parsed.path.strip("/")
# Sanitize path: keep only alphanumeric, replace others with underscore
sanitized = re.sub(r"[^a-zA-Z0-9]+", "_", path).strip("_")[:50]
# Add hash suffix for uniqueness (handles query strings, fragments, etc.)
url_hash = hashlib.sha256(url.encode()).hexdigest()[:8]
return f"{sanitized}_{url_hash}" if sanitized else url_hash
def build_scrape_metadata(
job_metadata: dict,
url: str,
job_id: str,
title: str,
) -> dict:
"""
Build combined metadata for a scraped page.
Combines job-level LLM metadata with deterministic fields.
Args:
job_metadata: LLM-extracted metadata from job (seed URL).
url: The scraped page URL.
job_id: The scrape job ID.
title: The page title.
Returns:
Combined metadata dictionary.
"""
parsed = urlparse(url)
# Start with job metadata (semantic fields from seed URL)
metadata = dict(job_metadata) if job_metadata else {}
# Add/override with deterministic fields
metadata.update(
{
"source_url": url,
"source_domain": parsed.netloc,
"job_id": job_id,
"content_type": "web_page",
"title": title or "Untitled",
"scraped_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
)
return metadata
def write_metadata_file(
s3_client,
bucket: str,
content_key: str,
metadata: dict,
) -> str:
"""
Write metadata JSON file alongside content file.
Args:
s3_client: Boto3 S3 client.
bucket: S3 bucket name.
content_key: S3 key of the content file.
metadata: Metadata dictionary.
Returns:
S3 key of the metadata file.
"""
metadata_key = f"{content_key}.metadata.json"
metadata_content = {"metadataAttributes": metadata}
s3_client.put_object(
Bucket=bucket,
Key=metadata_key,
Body=json.dumps(metadata_content),
ContentType=METADATA_CONTENT_TYPE,
)
logger.info(f"Wrote metadata to s3://{bucket}/{metadata_key}")
return metadata_key
def sanitize_for_s3_metadata(value: str, max_length: int = 256) -> str:
"""Sanitize a string for S3 metadata (ASCII only, limited length)."""
if not value:
return "Untitled"
# Replace non-ASCII with closest ASCII equivalent or remove
ascii_value = value.encode("ascii", "ignore").decode("ascii")
# Replace common unicode chars that get stripped
ascii_value = ascii_value.strip() or "Untitled"
return ascii_value[:max_length]
def lambda_handler(event, context):
"""
Main Lambda handler - processes URLs and extracts content.
"""
# Get environment variables
jobs_table = os.environ.get("SCRAPE_JOBS_TABLE")
urls_table = os.environ.get("SCRAPE_URLS_TABLE")
data_bucket = os.environ.get("DATA_BUCKET")
request_delay_ms = int(os.environ.get("REQUEST_DELAY_MS", "500"))
if not jobs_table:
raise ValueError("SCRAPE_JOBS_TABLE environment variable required")
if not urls_table:
raise ValueError("SCRAPE_URLS_TABLE environment variable required")
if not data_bucket:
raise ValueError("DATA_BUCKET environment variable required")
jobs_tbl = dynamodb.Table(jobs_table)
urls_tbl = dynamodb.Table(urls_table)
# Initialize deduplication service
dedup = DeduplicationService(urls_table)
processed = 0
failed = 0
skipped = 0
# Process SQS records
for record in event.get("Records", []):
message = None
job_id = None
url = None
try:
message = json.loads(record["body"])
job_id = message["job_id"]
url = message["url"]
# depth is available for potential future use (e.g., prioritization)
_ = message.get("depth", 0)
logger.info(f"Processing URL: job={job_id}, url={url}")
# Check if job is still active
job_response = jobs_tbl.get_item(Key={"job_id": job_id})
job_item = job_response.get("Item")
if not job_item:
logger.warning(f"Job not found: {job_id}")
continue
job_status = job_item.get("status")
if job_status in [
ScrapeStatus.CANCELLED.value,
ScrapeStatus.FAILED.value,
]:
logger.info(f"Job {job_id} is {job_status}, skipping")
skipped += 1
continue
# Get job config and metadata
config_data = job_item.get("config", {})
config = ScrapeConfig.from_dict(config_data)
job_metadata = job_item.get("job_metadata", {})
# Update URL status to processing
urls_tbl.update_item(
Key={"job_id": job_id, "url": url},
UpdateExpression="SET #status = :status",
ExpressionAttributeNames={"#status": "status"},
ExpressionAttributeValues={":status": UrlStatus.PROCESSING.value},
)
# Fetch content with auto SPA detection
scrape_mode = config_data.get("scrape_mode", "auto")
force_playwright = scrape_mode == "full"
result = fetch_auto(
url,
cookies=config.cookies,
headers=config.headers,
force_playwright=force_playwright,
delay_ms=request_delay_ms,
)
if result.error:
raise Exception(f"Fetch failed: {result.error}")
if not result.is_html:
raise Exception(f"Not HTML content: {result.content_type}")
# Extract content and convert to markdown
extracted = extract_content(result.content, url)
# Check for content changes (deduplication) - skip if force_rescrape enabled
if not config.force_rescrape and not dedup.is_content_changed(url, extracted.markdown):
logger.info(f"Content unchanged, skipping: {url}")
urls_tbl.update_item(
Key={"job_id": job_id, "url": url},
UpdateExpression="SET #status = :status",
ExpressionAttributeNames={"#status": "status"},
ExpressionAttributeValues={":status": UrlStatus.SKIPPED.value},
)
# Still count as processed for job completion
jobs_tbl.update_item(
Key={"job_id": job_id},
UpdateExpression=(
"SET processed_count = processed_count + :one, updated_at = :ts"
),
ExpressionAttributeValues={
":one": 1,
":ts": datetime.now(UTC).isoformat(),
},
)
skipped += 1
continue
# Generate URL slug and S3 key (content/ prefix - bypasses input/ trigger)
url_slug = generate_url_slug(url)
s3_key = f"content/{job_id}/{url_slug}.md"
# Upload to data bucket (content/ prefix)
s3.put_object(
Bucket=data_bucket,
Key=s3_key,
Body=extracted.markdown.encode("utf-8"),
ContentType="text/markdown",
Metadata={
"source_url": url,
"job_id": job_id,
"url_slug": url_slug,
"title": sanitize_for_s3_metadata(extracted.title),
},
)
logger.info(f"Saved content to s3://{data_bucket}/{s3_key}")
# Write metadata file for Bedrock KB ingestion
# Combines job-level LLM metadata with deterministic fields
scrape_metadata = build_scrape_metadata(
job_metadata=job_metadata,
url=url,
job_id=job_id,
title=extracted.title,
)
write_metadata_file(s3, data_bucket, s3_key, scrape_metadata)
# Store hash for future deduplication
dedup.store_hash(job_id, url, extracted.markdown)
# Update URL record with S3 key info
urls_tbl.update_item(
Key={"job_id": job_id, "url": url},
UpdateExpression=(
"SET #status = :status, url_slug = :slug, s3_key = :s3_key, "
"title = :title, processed_at = :ts, content_hash = :hash"
),
ExpressionAttributeNames={"#status": "status"},
ExpressionAttributeValues={
":status": UrlStatus.COMPLETED.value,
":slug": url_slug,
":s3_key": s3_key,
":title": extracted.title,
":ts": datetime.now(UTC).isoformat(),
":hash": dedup.get_content_hash(extracted.markdown),
},
)
# Update job processed count
jobs_tbl.update_item(
Key={"job_id": job_id},
UpdateExpression="SET processed_count = processed_count + :one, updated_at = :ts",
ExpressionAttributeValues={
":one": 1,
":ts": datetime.now(UTC).isoformat(),
},
)
# Publish processing progress update to subscribers
graphql_endpoint = os.environ.get("GRAPHQL_ENDPOINT")
current_processed = int(job_item.get("processed_count", 0)) + 1
publish_scrape_update(
graphql_endpoint=graphql_endpoint,
job_id=job_id,
base_url=job_item.get("base_url", ""),
title=job_item.get("title") or job_item.get("base_url", ""),
status=job_item.get("status", ScrapeStatus.PROCESSING.value),
total_urls=int(job_item.get("total_urls", 0)),
processed_count=current_processed,
failed_count=int(job_item.get("failed_count", 0)),
)
processed += 1
logger.info(f"Processed: {url} -> {s3_key}")
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
logger.error(f"AWS error processing record: {error_code} - {e}")
_mark_failed(jobs_tbl, urls_tbl, job_id, url, str(e))
failed += 1
raise
except Exception as e:
logger.error(f"Error processing record: {e}", exc_info=True)
_mark_failed(jobs_tbl, urls_tbl, job_id, url, str(e))
failed += 1
raise
return {
"processed": processed,
"failed": failed,
"skipped": skipped,
}
def _mark_failed(jobs_tbl, urls_tbl, job_id: str | None, url: str | None, error: str):
"""Mark URL as failed and update job counters."""
if not job_id or not url:
return
try:
# Update URL status to failed
urls_tbl.update_item(
Key={"job_id": job_id, "url": url},
UpdateExpression="SET #status = :status, #error = :err",
ExpressionAttributeNames={"#status": "status", "#error": "error"},
ExpressionAttributeValues={
":status": UrlStatus.FAILED.value,
":err": error[:500], # Truncate for DynamoDB
},
)
# Update job failed count
jobs_tbl.update_item(
Key={"job_id": job_id},
UpdateExpression=(
"SET failed_count = failed_count + :one, "
"failed_urls = list_append(if_not_exists(failed_urls, :empty), :url), "
"updated_at = :ts"
),
ExpressionAttributeValues={
":one": 1,
":empty": [],
":url": [{"url": url, "error": error[:200]}],
":ts": datetime.now(UTC).isoformat(),
},
)
except Exception as e:
logger.error(f"Failed to mark URL as failed: {e}")