from __future__ import annotations
import asyncio
from collections import defaultdict
from contextlib import asynccontextmanager, suppress
from datetime import datetime
from typing import ClassVar, Literal
from fastapi import HTTPException
from nanoid import generate
from patchright.async_api import BrowserContext, Page, Playwright, async_playwright
from getgather.browser.profile import BrowserProfile
from getgather.browser.resource_blocker import configure_context
from getgather.logs import logger
FRIENDLY_CHARS: str = "23456789abcdefghijkmnpqrstuvwxyz"
class BrowserStartupError(HTTPException):
"""Raised when browser fails to start."""
def __init__(self, message: str):
super().__init__(status_code=503, detail=message, headers={"X-No-Retry": "true"})
class BrowserSession:
_sessions: ClassVar[dict[str, BrowserSession]] = {} # tracking profile_id -> session
_locks: ClassVar[dict[str, asyncio.Lock]] = defaultdict(asyncio.Lock)
def __new__(cls, profile_id: str) -> BrowserSession:
if profile_id in cls._sessions:
return cls._sessions[profile_id]
else:
instance = super(BrowserSession, cls).__new__(cls)
return instance
def __init__(self, profile_id: str):
if getattr(self, "_initialized", False): # double init check_initialized")
return
self._initialized = True
self.profile: BrowserProfile = BrowserProfile(id=profile_id)
self._playwright: Playwright | None = None
self._context: BrowserContext | None = None
self.last_active_timestamp: datetime | None = None
self.session_id = generate(FRIENDLY_CHARS, 8)
self.total_event = 0
@classmethod
def get(cls, profile: BrowserProfile) -> BrowserSession:
if profile.id in cls._sessions: # retrieve active session
return cls._sessions[profile.id]
else: # create new session
return BrowserSession(profile.id)
@classmethod
def get_all_sessions(cls) -> list[BrowserSession]:
return list(cls._sessions.values())
@property
def context(self) -> BrowserContext:
assert self._context is not None, "Browser session not started"
return self._context
@property
def playwright(self) -> Playwright:
assert self._playwright is not None, "Browser session not started"
return self._playwright
def _update_last_active(self):
"""Update the last active timestamp for this session."""
self.last_active_timestamp = datetime.now()
async def new_page(self) -> Page:
logger.info(f"Creating new page in context with profile {self.profile.id}")
self._update_last_active()
page = await self.context.new_page()
return add_retry_to_page_goto(page)
async def page(self) -> Page:
# TODO: It's okay for now to return the last page. We may want to track all pages in the future.
self._update_last_active()
if self.context.pages and len(self.context.pages) > 0:
logger.info(f"Returning existing page in context with profile {self.profile.id}")
return self.context.pages[-1]
return await self.new_page()
async def start(self, debug_url: str | None = "https://ip.fly.dev/all") -> BrowserSession:
if self.profile.id in BrowserSession._sessions:
# Session already started
return BrowserSession._sessions[self.profile.id]
lock = self._locks[self.profile.id]
async with lock: # prevent race condition when two requests try to start the same profile
try:
if self.profile.id in BrowserSession._sessions:
# Session already started
return BrowserSession._sessions[self.profile.id]
logger.info(
f"Starting new session with profile {self.profile.id}",
extra={"profile_id": self.profile.id},
)
self._playwright = await async_playwright().start()
self._context = await self.profile.launch(
profile_id=self.profile.id, browser_type=self.playwright.chromium
)
# Set last active timestamp and safely register the session at the end
self.last_active_timestamp = datetime.now()
self._sessions[self.profile.id] = self
logger.info(
f"Session {self.profile.id} registered in sessions with last_active_timestamp {self.last_active_timestamp}"
)
await configure_context(self._context)
if debug_url:
debug_page = await self.page()
await debug_page.goto(debug_url)
# Intentionally create a new page to apply resources filtering (from blocklists)
await self.new_page()
return self
except Exception as e:
logger.error(f"Error starting browser: {e}")
raise BrowserStartupError(f"Failed to start browser: {e}") from e
async def stop(self):
logger.info(
"Closing browser",
extra={
"profile_id": self.profile.id,
},
)
try:
if self._context and self.context.browser:
await self.context.browser.close()
except Exception as e:
logger.error(f"Error closing browser; continuing teardown: {e}")
finally:
if self._playwright:
with suppress(Exception): # try or die kill playwright
await self.playwright.stop()
try:
# clean up local browser profile after playwright is stopped
self.profile.cleanup(self.profile.id)
finally: # ensure we always remove session from tracking
self._sessions.pop(self.profile.id, None)
self._context = None
self._playwright = None
@asynccontextmanager
async def browser_session(profile: BrowserProfile, *, nested: bool = False, stop_ok: bool = True):
session = BrowserSession.get(profile)
if not nested:
session = await session.start()
try:
yield session
finally:
if not nested and stop_ok:
await session.stop()
def add_retry_to_page_goto(page: Page, max_retries: int = 3) -> Page:
original_goto = page.goto
async def goto_with_retry(
url: str,
*,
timeout: float | None = None,
wait_until: Literal["commit", "domcontentloaded", "load", "networkidle"] | None = None,
referer: str | None = None,
):
for i in range(max_retries):
try:
return await original_goto(
url, timeout=timeout, wait_until=wait_until, referer=referer
)
except Exception as error:
msg = f"page.goto {url} {i + 1} of {max_retries} failed: {error}"
if i < max_retries - 1:
logger.warning(msg + "\nRetrying...")
else:
logger.exception(msg)
raise
setattr(page, "goto", goto_with_retry)
return page