MCP Browser Use Server
by JovaniPink
- src
- mcp_browser_use
- utils
# -*- coding: utf-8 -*-
import base64
import logging
import os
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_ollama import ChatOllama
from langchain_openai import AzureChatOpenAI, ChatOpenAI
logger = logging.getLogger(__name__)
def get_llm_model(provider: str, **kwargs) -> Any:
"""
Return an initialized language model client based on the given provider name.
:param provider: The name of the LLM provider (e.g., "anthropic", "openai", "azure_openai").
:param kwargs: Additional parameters (model_name, temperature, base_url, api_key, etc.).
:return: An instance of a ChatLLM from the relevant langchain_* library.
:raises ValueError: If the provider is unsupported.
"""
if provider == "anthropic":
base_url = kwargs.get("base_url") or "https://api.anthropic.com"
api_key = kwargs.get("api_key") or os.getenv("ANTHROPIC_API_KEY", "")
return ChatAnthropic(
model_name=kwargs.get("model_name", "claude-3-5-sonnet-20240620"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "openai":
base_url = kwargs.get("base_url") or os.getenv(
"OPENAI_ENDPOINT", "https://api.openai.com/v1"
)
api_key = kwargs.get("api_key") or os.getenv("OPENAI_API_KEY", "")
return ChatOpenAI(
model=kwargs.get("model_name", "gpt-4o"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "deepseek":
base_url = kwargs.get("base_url") or os.getenv("DEEPSEEK_ENDPOINT", "")
api_key = kwargs.get("api_key") or os.getenv("DEEPSEEK_API_KEY", "")
return ChatOpenAI(
model=kwargs.get("model_name", "deepseek-chat"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "gemini":
api_key = kwargs.get("api_key") or os.getenv("GOOGLE_API_KEY", "")
return ChatGoogleGenerativeAI(
model=kwargs.get("model_name", "gemini-2.0-flash-exp"),
temperature=kwargs.get("temperature", 0.0),
google_api_key=api_key,
)
elif provider == "ollama":
return ChatOllama(
model=kwargs.get("model_name", "phi4"),
temperature=kwargs.get("temperature", 0.0),
num_ctx=128000,
base_url=kwargs.get("base_url", "http://localhost:11434"),
)
elif provider == "azure_openai":
base_url = kwargs.get("base_url") or os.getenv("AZURE_OPENAI_ENDPOINT", "")
api_key = kwargs.get("api_key") or os.getenv("AZURE_OPENAI_API_KEY", "")
return AzureChatOpenAI(
model=kwargs.get("model_name", "gpt-4o"),
temperature=kwargs.get("temperature", 0.0),
api_version="2024-05-01-preview",
azure_endpoint=base_url,
api_key=api_key,
)
else:
raise ValueError(f"Unsupported provider: {provider}")
# Commonly used model names for quick reference
model_names = {
"anthropic": ["claude-3-5-sonnet-20240620", "claude-3-opus-20240229"],
"openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"],
"deepseek": ["deepseek-chat"],
"gemini": [
"gemini-2.0-flash-exp",
"gemini-2.0-flash-thinking-exp",
"gemini-1.5-flash-latest",
"gemini-1.5-flash-8b-latest",
"gemini-2.0-flash-thinking-exp-1219",
],
"ollama": ["deepseek-r1:671b", "qwen2.5:7b", "llama3.3", "phi4"],
"azure_openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"],
}
def encode_image(img_path: Optional[str]) -> Optional[str]:
"""
Convert an image at `img_path` into a base64-encoded string.
Returns None if `img_path` is None or empty.
Raises FileNotFoundError if the file doesn't exist.
"""
if not img_path:
return None
try:
with open(img_path, "rb") as fin:
image_data = base64.b64encode(fin.read()).decode("utf-8")
return image_data
except FileNotFoundError as e:
logger.error(f"Image not found at path {img_path}: {e}")
raise
except Exception as e:
logger.error(f"Error encoding image at {img_path}: {e}")
raise
def get_latest_files(
directory: str, file_types: List[str] = [".webm", ".zip"]
) -> Dict[str, Optional[str]]:
"""
Find the latest file for each extension in `file_types` under `directory`.
Returns a dict {file_extension: latest_file_path or None}.
:param directory: The directory to search.
:param file_types: List of file extensions (e.g., [".webm", ".zip"]).
:return: dict mapping each extension to the path of the newest file or None if not found.
"""
latest_files: Dict[str, Optional[str]] = {ext: None for ext in file_types}
if not os.path.exists(directory):
logger.debug(f"Directory '{directory}' does not exist. Creating it.")
os.makedirs(directory, exist_ok=True)
return latest_files
for file_type in file_types:
try:
matches = list(Path(directory).rglob(f"*{file_type}"))
if matches:
# Sort or use max() by modified time
latest = max(matches, key=lambda p: p.stat().st_mtime)
# Check if file is not actively being written
if time.time() - latest.stat().st_mtime > 1.0:
latest_files[file_type] = str(latest)
else:
logger.debug(
f"Skipping file {latest} - possibly still being written."
)
except Exception as e:
logger.error(f"Error getting latest {file_type} file in '{directory}': {e}")
return latest_files
async def capture_screenshot(browser_context) -> Optional[str]:
"""
Capture a screenshot of the first open page (not 'about:blank') in the browser_context,
returning a base64-encoded string, or None if no page is available.
:param browser_context: The browser context which should hold a reference to the playwright browser.
:return: Base64-encoded JPEG screenshot or None on failure.
"""
playwright_browser = getattr(browser_context.browser, "playwright_browser", None)
if not playwright_browser or not playwright_browser.contexts:
logger.debug("No available playwright_browser or contexts.")
return None
# Use the first Playwright context
playwright_context = playwright_browser.contexts[0]
if not playwright_context:
logger.debug("No valid playwright_context found.")
return None
# Find the first non-blank page
pages = playwright_context.pages
if not pages:
logger.debug("No pages available in playwright_context.")
return None
active_page = pages[0]
for page in pages:
if page.url != "about:blank":
active_page = page
break
try:
screenshot = await active_page.screenshot(type="jpeg", quality=75, scale="css")
encoded = base64.b64encode(screenshot).decode("utf-8")
return encoded
except Exception as e:
logger.error(f"Failed to capture screenshot: {e}")
return None