OmniMCP

by OpenAdaptAI
Verified
# omnimcp/omniparser/client.py """Client module for interacting with the OmniParser server.""" import base64 from typing import Optional, Dict, List from loguru import logger from PIL import Image, ImageDraw import boto3 # Need boto3 for the initial check import requests from .server import Deploy from ..config import config class OmniParserClient: """Client for interacting with the OmniParser server.""" def __init__(self, server_url: Optional[str] = None, auto_deploy: bool = True): """Initialize the OmniParser client. Args: server_url: URL of the OmniParser server. If None, will attempt to find or deploy a server. auto_deploy: Whether to automatically deploy a server if none is found. """ self.server_url = server_url self.auto_deploy = auto_deploy self._ensure_server() def _ensure_server(self) -> None: """Ensure a server is available, deploying one if necessary.""" if self.server_url: logger.info(f"Using provided server URL: {self.server_url}") else: logger.info("No server_url provided, attempting discovery/deployment...") # Try finding existing running instance first instance_ip = None instance_id = None try: ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) instances = ec2.instances.filter( Filters=[ { "Name": "tag:Name", "Values": [config.PROJECT_NAME], }, # Use project name tag {"Name": "instance-state-name", "Values": ["running"]}, ] ) # Get the most recently launched running instance running_instances = sorted( list(instances), key=lambda i: i.launch_time, reverse=True ) instance = running_instances[0] if running_instances else None if instance and instance.public_ip_address: instance_ip = instance.public_ip_address instance_id = instance.id # Store ID too for logging maybe self.server_url = f"http://{instance_ip}:{config.PORT}" logger.success( f"Found existing running server instance {instance_id} at {self.server_url}" ) elif self.auto_deploy: logger.info( "No running server found, attempting auto-deployment via Deploy.start()..." ) # Call start and get the result directly deployer = Deploy() # Deploy.start now returns IP and ID instance_ip, instance_id = deployer.start() if instance_ip and instance_id: # Deployment succeeded, set the URL self.server_url = f"http://{instance_ip}:{config.PORT}" logger.success( f"Auto-deployment successful. Server URL: {self.server_url} (Instance ID: {instance_id})" ) else: # deployer.start() failed and returned None raise RuntimeError( "Auto-deployment failed (Deploy.start did not return valid IP/ID). Check server logs." ) else: # No running instance and auto_deploy is False raise RuntimeError( "No server URL provided, no running instance found, and auto_deploy is disabled." ) except Exception as e: logger.error( f"Error during server discovery/deployment: {e}", exc_info=True ) # Re-raise as a RuntimeError to be caught by the main script if needed raise RuntimeError(f"Server discovery/deployment failed: {e}") from e # Verify server is responsive (only if server_url is now set) if self.server_url: logger.info(f"Checking server responsiveness at {self.server_url}...") try: self._check_server() # This probes the URL logger.success(f"Server at {self.server_url} is responsive.") except Exception as check_err: logger.error(f"Server check failed for {self.server_url}: {check_err}") # Raise error - if we have a URL it should be responsive after deployment/discovery raise RuntimeError( f"Server at {self.server_url} failed responsiveness check." ) from check_err else: # Safety check - should not be reachable if logic above is correct raise RuntimeError("Critical error: Failed to obtain server URL.") def _check_server(self) -> None: """Check if the server is responsive.""" if not self.server_url: raise RuntimeError( "Cannot check server responsiveness, server_url is not set." ) try: # Increased timeout slightly response = requests.get(f"{self.server_url}/probe/", timeout=15) response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx) # Check content if needed: assert response.json().get("message") == "..." except requests.exceptions.Timeout: logger.error( f"Timeout connecting to server probe endpoint: {self.server_url}/probe/" ) raise RuntimeError(f"Server probe timed out for {self.server_url}") except requests.exceptions.ConnectionError: logger.error( f"Connection error reaching server probe endpoint: {self.server_url}/probe/" ) raise RuntimeError(f"Server probe connection error for {self.server_url}") except requests.exceptions.RequestException as e: logger.error( f"Error during server probe request for {self.server_url}: {e}" ) raise RuntimeError(f"Server probe failed: {e}") from e def parse_image(self, image: Image.Image) -> Dict: """Parse an image using the OmniParser server. Args: image: PIL Image to parse Returns: Dict containing parsing results """ # Convert image to base64 image_bytes = self._image_to_base64(image) # Make request try: response = requests.post( f"{self.server_url}/parse/", json={"base64_image": image_bytes}, timeout=30, ) response.raise_for_status() return response.json() except Exception as e: return {"error": f"Failed to parse image: {e}"} @staticmethod def _image_to_base64(image: Image.Image) -> str: """Convert PIL Image to base64 string.""" import io buffered = io.BytesIO() image.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode() def visualize_results( self, image: Image.Image, parsed_content: List[Dict] ) -> Image.Image: """Visualize parsing results on the image. Args: image: Original PIL Image parsed_content: List of parsed content with bounding boxes Returns: PIL Image with visualizations """ # Create copy of image viz_image = image.copy() draw = ImageDraw.Draw(viz_image) # Draw results for item in parsed_content: # Get coordinates x1, y1, x2, y2 = item["bbox"] x1 = int(x1 * image.width) y1 = int(y1 * image.height) x2 = int(x2 * image.width) y2 = int(y2 * image.height) # Draw box draw.rectangle([(x1, y1), (x2, y2)], outline="red", width=2) # Draw label label = item["content"] bbox = draw.textbbox((x1, y1), label) draw.rectangle(bbox, fill="white") draw.text((x1, y1), label, fill="red") return viz_image # Example usage: if __name__ == "__main__": # Create client (will auto-deploy if needed) client = OmniParserClient() # Parse an image image = Image.open("../OpenAdapt/tests/assets/excel.png") results = client.parse_image(image) # Visualize results if "error" not in results: viz_image = client.visualize_results(image, results["parsed_content_list"]) viz_image.show()