Skip to main content
Glama

OpenSCAD MCP Server

by jhacksman
cuda_mvs_client.py16.1 kB
""" Client for remote CUDA Multi-View Stereo processing. This module provides a client to connect to a remote CUDA MVS server within the LAN for processing multi-view images into 3D models. """ import os import json import logging import requests import base64 from typing import Dict, List, Optional, Any, Union from pathlib import Path import uuid # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CUDAMVSClient: """ Client for connecting to a remote CUDA Multi-View Stereo server. This client handles: 1. Discovering available CUDA MVS servers on the LAN 2. Uploading images to the server 3. Requesting 3D reconstruction 4. Downloading the resulting 3D models 5. Monitoring job status """ def __init__( self, server_url: Optional[str] = None, api_key: Optional[str] = None, output_dir: str = "output/models", discovery_port: int = 8765, connection_timeout: int = 10, upload_chunk_size: int = 1024 * 1024, # 1MB chunks ): """ Initialize the CUDA MVS client. Args: server_url: URL of the CUDA MVS server (if known) api_key: API key for authentication (if required) output_dir: Directory to save downloaded models discovery_port: Port used for server discovery connection_timeout: Timeout for server connections in seconds upload_chunk_size: Chunk size for file uploads in bytes """ self.server_url = server_url self.api_key = api_key self.output_dir = output_dir self.discovery_port = discovery_port self.connection_timeout = connection_timeout self.upload_chunk_size = upload_chunk_size # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Initialize session for connection pooling self.session = requests.Session() if api_key: self.session.headers.update({"Authorization": f"Bearer {api_key}"}) def discover_servers(self) -> List[Dict[str, Any]]: """ Discover CUDA MVS servers on the local network. Returns: List of dictionaries containing server information: [ { "server_id": "unique-server-id", "name": "CUDA MVS Server 1", "url": "http://192.168.1.100:8765", "capabilities": { "max_images": 50, "max_resolution": 4096, "supported_formats": ["jpg", "png"], "gpu_info": "NVIDIA RTX 4090 24GB" }, "status": "available" }, ... ] """ import socket import json from zeroconf import ServiceBrowser, ServiceListener, Zeroconf discovered_servers = [] class CUDAMVSListener(ServiceListener): def add_service(self, zc, type_, name): info = zc.get_service_info(type_, name) if info: server_info = { "server_id": name.split('.')[0], "name": info.properties.get(b'name', b'Unknown').decode('utf-8'), "url": f"http://{socket.inet_ntoa(info.addresses[0])}:{info.port}", "capabilities": json.loads(info.properties.get(b'capabilities', b'{}').decode('utf-8')), "status": "available" } discovered_servers.append(server_info) logger.info(f"Discovered CUDA MVS server: {server_info['name']} at {server_info['url']}") try: zeroconf = Zeroconf() listener = CUDAMVSListener() browser = ServiceBrowser(zeroconf, "_cudamvs._tcp.local.", listener) # Wait for discovery (non-blocking in production code) import time time.sleep(2) # Give some time for discovery zeroconf.close() return discovered_servers except Exception as e: logger.error(f"Error discovering CUDA MVS servers: {e}") return [] def test_connection(self, server_url: Optional[str] = None) -> Dict[str, Any]: """ Test connection to a CUDA MVS server. Args: server_url: URL of the server to test (uses self.server_url if None) Returns: Dictionary with connection status and server information """ url = server_url or self.server_url if not url: return {"status": "error", "message": "No server URL provided"} try: response = self.session.get( f"{url}/api/status", timeout=self.connection_timeout ) if response.status_code == 200: return { "status": "success", "server_info": response.json(), "latency_ms": response.elapsed.total_seconds() * 1000 } else: return { "status": "error", "message": f"Server returned status code {response.status_code}", "details": response.text } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Connection error: {str(e)}"} def upload_images(self, image_paths: List[str]) -> Dict[str, Any]: """ Upload images to the CUDA MVS server. Args: image_paths: List of paths to images to upload Returns: Dictionary with upload status and job information """ if not self.server_url: return {"status": "error", "message": "No server URL configured"} # Create a new job try: response = self.session.post( f"{self.server_url}/api/jobs", json={"num_images": len(image_paths)}, timeout=self.connection_timeout ) if response.status_code != 201: return { "status": "error", "message": f"Failed to create job: {response.status_code}", "details": response.text } job_info = response.json() job_id = job_info["job_id"] # Upload each image for i, image_path in enumerate(image_paths): # Check if file exists if not os.path.exists(image_path): return { "status": "error", "message": f"Image file not found: {image_path}" } # Get file size for progress tracking file_size = os.path.getsize(image_path) # Prepare upload with open(image_path, "rb") as f: files = { "file": (os.path.basename(image_path), f, "image/jpeg" if image_path.endswith(".jpg") else "image/png") } metadata = { "image_index": i, "total_images": len(image_paths), "filename": os.path.basename(image_path) } response = self.session.post( f"{self.server_url}/api/jobs/{job_id}/images", files=files, data={"metadata": json.dumps(metadata)}, timeout=None # No timeout for uploads ) if response.status_code != 200: return { "status": "error", "message": f"Failed to upload image {i+1}/{len(image_paths)}: {response.status_code}", "details": response.text } logger.info(f"Uploaded image {i+1}/{len(image_paths)}: {os.path.basename(image_path)}") # Start processing response = self.session.post( f"{self.server_url}/api/jobs/{job_id}/process", timeout=self.connection_timeout ) if response.status_code != 202: return { "status": "error", "message": f"Failed to start processing: {response.status_code}", "details": response.text } return { "status": "success", "job_id": job_id, "message": f"Uploaded {len(image_paths)} images and started processing", "job_url": f"{self.server_url}/api/jobs/{job_id}" } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Upload error: {str(e)}"} def get_job_status(self, job_id: str) -> Dict[str, Any]: """ Get the status of a CUDA MVS job. Args: job_id: ID of the job to check Returns: Dictionary with job status information """ if not self.server_url: return {"status": "error", "message": "No server URL configured"} try: response = self.session.get( f"{self.server_url}/api/jobs/{job_id}", timeout=self.connection_timeout ) if response.status_code == 200: return { "status": "success", "job_info": response.json() } else: return { "status": "error", "message": f"Failed to get job status: {response.status_code}", "details": response.text } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Connection error: {str(e)}"} def download_model(self, job_id: str, output_format: str = "obj") -> Dict[str, Any]: """ Download a processed 3D model from the CUDA MVS server. Args: job_id: ID of the job to download output_format: Format of the model to download (obj, ply, etc.) Returns: Dictionary with download status and local file path """ if not self.server_url: return {"status": "error", "message": "No server URL configured"} # Check job status first status_result = self.get_job_status(job_id) if status_result["status"] != "success": return status_result job_info = status_result["job_info"] if job_info["status"] != "completed": return { "status": "error", "message": f"Job is not completed yet. Current status: {job_info['status']}", "job_info": job_info } # Download the model try: response = self.session.get( f"{self.server_url}/api/jobs/{job_id}/model?format={output_format}", stream=True, timeout=None # No timeout for downloads ) if response.status_code != 200: return { "status": "error", "message": f"Failed to download model: {response.status_code}", "details": response.text } # Create a unique filename model_id = job_info.get("model_id", str(uuid.uuid4())) output_path = os.path.join(self.output_dir, f"{model_id}.{output_format}") # Save the file with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) logger.info(f"Downloaded model to {output_path}") return { "status": "success", "model_id": model_id, "local_path": output_path, "format": output_format, "job_id": job_id } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Download error: {str(e)}"} def cancel_job(self, job_id: str) -> Dict[str, Any]: """ Cancel a running CUDA MVS job. Args: job_id: ID of the job to cancel Returns: Dictionary with cancellation status """ if not self.server_url: return {"status": "error", "message": "No server URL configured"} try: response = self.session.delete( f"{self.server_url}/api/jobs/{job_id}", timeout=self.connection_timeout ) if response.status_code == 200: return { "status": "success", "message": "Job cancelled successfully" } else: return { "status": "error", "message": f"Failed to cancel job: {response.status_code}", "details": response.text } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Connection error: {str(e)}"} def generate_model_from_images( self, image_paths: List[str], output_format: str = "obj", wait_for_completion: bool = True, poll_interval: int = 5 ) -> Dict[str, Any]: """ Complete workflow to generate a 3D model from images. Args: image_paths: List of paths to images output_format: Format of the output model wait_for_completion: Whether to wait for job completion poll_interval: Interval in seconds to poll for job status Returns: Dictionary with job status and model information if completed """ # Upload images and start processing upload_result = self.upload_images(image_paths) if upload_result["status"] != "success": return upload_result job_id = upload_result["job_id"] # If not waiting for completion, return the job info if not wait_for_completion: return upload_result # Poll for job completion import time while True: status_result = self.get_job_status(job_id) if status_result["status"] != "success": return status_result job_info = status_result["job_info"] if job_info["status"] == "completed": # Download the model return self.download_model(job_id, output_format) elif job_info["status"] == "failed": return { "status": "error", "message": "Job processing failed", "job_info": job_info } # Wait before polling again time.sleep(poll_interval) logger.info(f"Job {job_id} status: {job_info['status']}, progress: {job_info.get('progress', 0)}%")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jhacksman/OpenSCAD-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server