#!/usr/bin/env python3
"""End-to-end test script using real Nexos.ai API."""
import asyncio
import os
import sys
import tempfile
from pathlib import Path
from dotenv import load_dotenv
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# Load environment variables from .env
load_dotenv(project_root / ".env")
async def test_generate_image():
"""Test simple image generation with real API."""
print("\n" + "=" * 60)
print("TEST 1: Simple Image Generation")
print("=" * 60)
from Imagen_MCP.tools.generate_image import generate_image
# Create a temporary file for the output
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
output_path = f.name
# Use internal model ID - the tool will convert to API ID automatically
result = await generate_image(
prompt="A simple red circle on a white background",
output_path=output_path,
model="imagen-4-fast", # Internal ID - will be converted to "Imagen 4 Fast (Public)"
size="1024x1024", # Use supported size
)
print(f"Success: {result.success}")
print(f"Model used: {result.model_used}")
if result.success:
if result.file_path:
print(f"Image saved to: {result.file_path}")
if result.file_size_bytes:
print(f"File size: {result.file_size_bytes} bytes")
if result.revised_prompt:
print(f"Revised prompt: {result.revised_prompt}")
# Clean up
Path(output_path).unlink(missing_ok=True)
else:
print(f"Error: {result.error}")
return result.success
async def test_model_catalog():
"""Test model catalog resource."""
print("\n" + "=" * 60)
print("TEST 2: Model Catalog Resource")
print("=" * 60)
from Imagen_MCP.resources.models import get_model_catalog, list_model_ids
catalog = get_model_catalog()
model_ids = list_model_ids()
print(f"Total models: {catalog['total_count']}")
print(f"Default model: {catalog['default_model']}")
print(f"Available models: {model_ids}")
return True
async def test_nexos_client_direct():
"""Test Nexos.ai client directly."""
print("\n" + "=" * 60)
print("TEST 3: Direct Nexos.ai Client Test")
print("=" * 60)
from Imagen_MCP.services.nexos_client import NexosClient
from Imagen_MCP.models.generation import GenerateImageRequest
api_key = os.getenv("NEXOS_API_KEY")
if not api_key:
print("ERROR: NEXOS_API_KEY not set")
return False
print(f"API Key: {api_key[:20]}...{api_key[-10:]}")
client = NexosClient(api_key=api_key)
# Use the actual API model ID for direct client test
request = GenerateImageRequest(
prompt="A blue square on a gray background",
model="Imagen 4 Fast (Public)", # Actual API model ID
n=1,
size="1024x1024", # Use supported size
)
print("Sending request to Nexos.ai...")
print(f" Prompt: {request.prompt}")
print(f" Model: {request.model}")
print(f" Size: {request.size}")
try:
response = await client.generate_image(request)
print("Response received!")
print(f" Created: {response.created}")
print(f" Images count: {len(response.images)}")
if response.images:
img = response.images[0]
if img.b64_json:
print(f" Image data length: {len(img.b64_json)} chars")
if img.url:
print(f" Image URL: {img.url}")
if img.revised_prompt:
print(f" Revised prompt: {img.revised_prompt}")
return True
except Exception as e:
print(f"ERROR: {type(e).__name__}: {e}")
return False
async def main():
"""Run all e2e tests."""
print("=" * 60)
print("IMAGEN MCP SERVER - END-TO-END TESTS")
print("Using real Nexos.ai API")
print("=" * 60)
results = {}
# Test 1: Model catalog (no API call)
try:
results["model_catalog"] = await test_model_catalog()
except Exception as e:
print(f"ERROR in model_catalog: {e}")
results["model_catalog"] = False
# Test 2: Direct client test
try:
results["nexos_client"] = await test_nexos_client_direct()
except Exception as e:
print(f"ERROR in nexos_client: {e}")
results["nexos_client"] = False
# Test 3: Generate image tool (only if client works)
if results.get("nexos_client"):
try:
results["generate_image"] = await test_generate_image()
except Exception as e:
print(f"ERROR in generate_image: {e}")
results["generate_image"] = False
else:
print("\nSkipping generate_image test due to client failure")
results["generate_image"] = None
# Summary
print("\n" + "=" * 60)
print("TEST SUMMARY")
print("=" * 60)
for test_name, passed in results.items():
if passed is None:
status = "SKIPPED"
elif passed:
status = "PASSED"
else:
status = "FAILED"
print(f" {test_name}: {status}")
all_passed = all(v is True for v in results.values() if v is not None)
print(f"\nOverall: {'ALL TESTS PASSED' if all_passed else 'SOME TESTS FAILED'}")
return 0 if all_passed else 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)