#!/usr/bin/env python3
"""End-to-end tests for all MCP server endpoints using real Nexos.ai API.
This script tests all MCP tools and resources:
- Tools: generate_image, start_image_batch, get_next_image, get_batch_status
- Resources: models://image-generation, models://image-generation/{model_id}
"""
import asyncio
import os
import sys
import tempfile
from pathlib import Path
from dotenv import load_dotenv
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# Load environment variables from .env
load_dotenv(project_root / ".env")
class TestResults:
"""Track test results."""
def __init__(self):
self.results: dict[str, bool | None] = {}
self.errors: dict[str, str] = {}
def record(self, name: str, passed: bool, error: str | None = None):
self.results[name] = passed
if error:
self.errors[name] = error
def skip(self, name: str, reason: str):
self.results[name] = None
self.errors[name] = f"SKIPPED: {reason}"
def summary(self) -> tuple[int, int, int]:
"""Return (passed, failed, skipped) counts."""
passed = sum(1 for v in self.results.values() if v is True)
failed = sum(1 for v in self.results.values() if v is False)
skipped = sum(1 for v in self.results.values() if v is None)
return passed, failed, skipped
results = TestResults()
# ============================================================================
# RESOURCE TESTS
# ============================================================================
def test_resource_model_catalog():
"""Test models://image-generation resource."""
print("\n" + "=" * 60)
print("TEST: Resource - models://image-generation")
print("=" * 60)
from Imagen_MCP.resources.models import get_model_catalog
try:
catalog = get_model_catalog()
# Verify structure
assert "models" in catalog, "Missing 'models' key"
assert "total_count" in catalog, "Missing 'total_count' key"
assert "default_model" in catalog, "Missing 'default_model' key"
# Verify content
assert catalog["total_count"] >= 5, (
f"Expected at least 5 models, got {catalog['total_count']}"
)
assert catalog["default_model"] == "imagen-4", (
f"Expected default model 'imagen-4', got {catalog['default_model']}"
)
# Verify model structure
for model in catalog["models"]:
assert "id" in model, "Model missing 'id'"
assert "name" in model, "Model missing 'name'"
assert "capabilities" in model, "Model missing 'capabilities'"
assert "use_cases" in model, "Model missing 'use_cases'"
print(f"✓ Catalog has {catalog['total_count']} models")
print(f"✓ Default model: {catalog['default_model']}")
print(f"✓ Model IDs: {[m['id'] for m in catalog['models']]}")
results.record("resource_model_catalog", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("resource_model_catalog", False, str(e))
return False
def test_resource_model_details():
"""Test models://image-generation/{model_id} resource."""
print("\n" + "=" * 60)
print("TEST: Resource - models://image-generation/{model_id}")
print("=" * 60)
from Imagen_MCP.resources.models import get_model_info, list_model_ids
try:
# Test with valid model
model_ids = list_model_ids()
assert len(model_ids) > 0, "No model IDs returned"
test_model = "imagen-4-fast"
info = get_model_info(test_model)
assert info is not None, f"Model info for {test_model} is None"
assert info["id"] == test_model, "Model ID mismatch"
assert "name" in info, "Missing 'name'"
assert "capabilities" in info, "Missing 'capabilities'"
assert "api_id" in info, "Missing 'api_id'"
print(f"✓ Model info for '{test_model}':")
print(f" - Name: {info['name']}")
print(f" - API ID: {info['api_id']}")
print(f" - Max images: {info['capabilities']['max_images_per_request']}")
# Test with invalid model
invalid_info = get_model_info("invalid-model-xyz")
assert invalid_info is None, "Expected None for invalid model"
print("✓ Returns None for invalid model")
results.record("resource_model_details", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("resource_model_details", False, str(e))
return False
def test_tool_list_models():
"""Test list_models tool."""
print("\n" + "=" * 60)
print("TEST: Tool - list_models")
print("=" * 60)
from Imagen_MCP.services.model_registry import get_model_registry
try:
registry = get_model_registry()
models = []
for model in registry.get_all_models():
models.append(
{
"id": model.id,
"name": model.name,
"provider": model.provider,
"description": model.description,
}
)
result = {
"models": models,
"total_count": len(models),
"default_model": "imagen-4",
}
# Verify structure
assert result["total_count"] >= 5, (
f"Expected at least 5 models, got {result['total_count']}"
)
assert result["default_model"] == "imagen-4", "Wrong default model"
# Verify all expected models are present
model_ids = [m["id"] for m in result["models"]]
expected_models = [
"imagen-4",
"imagen-4-fast",
"imagen-4-ultra",
"dall-e-3",
"gpt-image-1",
]
for expected in expected_models:
assert expected in model_ids, f"Missing model: {expected}"
print(f"✓ Total models: {result['total_count']}")
print(f"✓ Default model: {result['default_model']}")
print(f"✓ Model IDs: {model_ids}")
results.record("tool_list_models", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("tool_list_models", False, str(e))
return False
def test_tool_get_model_details():
"""Test get_model_details tool."""
print("\n" + "=" * 60)
print("TEST: Tool - get_model_details")
print("=" * 60)
from Imagen_MCP.services.model_registry import get_model_registry
try:
registry = get_model_registry()
# Test with valid model
model_id = "imagen-4-fast"
model = registry.get_model(model_id)
assert model is not None, f"Model {model_id} not found"
assert model.id == model_id, "Model ID mismatch"
assert model.api_id == "Imagen 4 Fast (Public)", f"Wrong API ID: {model.api_id}"
assert model.provider == "Google", f"Wrong provider: {model.provider}"
print(f"✓ Model: {model.name}")
print(f"✓ API ID: {model.api_id}")
print(f"✓ Provider: {model.provider}")
print(f"✓ Description: {model.description[:50]}...")
# Test with invalid model
invalid_model = registry.get_model("invalid-model-xyz")
assert invalid_model is None, "Expected None for invalid model"
print("✓ Returns None for invalid model")
results.record("tool_get_model_details", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("tool_get_model_details", False, str(e))
return False
# ============================================================================
# TOOL TESTS
# ============================================================================
async def test_tool_generate_image():
"""Test generate_image tool with real API."""
print("\n" + "=" * 60)
print("TEST: Tool - generate_image")
print("=" * 60)
from Imagen_MCP.tools.generate_image import generate_image
try:
# Create a temporary file for the output
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
output_path = f.name
result = await generate_image(
prompt="A simple geometric pattern with blue triangles",
output_path=output_path,
model="imagen-4-fast",
size="1024x1024",
)
assert result.success, f"Generation failed: {result.error}"
assert result.model_used == "imagen-4-fast", f"Wrong model: {result.model_used}"
assert result.file_path is not None, "No file path returned"
assert result.file_size_bytes is not None, "No file size returned"
# Verify file exists and has content
file_path = Path(result.file_path)
assert file_path.exists(), f"Output file not found: {result.file_path}"
assert result.file_size_bytes > 1000, "Image file too small"
print(f"✓ Image saved to: {result.file_path}")
print(f"✓ File size: {result.file_size_bytes} bytes")
if result.revised_prompt:
print(f"✓ Revised prompt: {result.revised_prompt[:50]}...")
# Clean up
file_path.unlink(missing_ok=True)
results.record("tool_generate_image", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("tool_generate_image", False, str(e))
return False
async def test_tool_generate_image_validation():
"""Test generate_image tool validation."""
print("\n" + "=" * 60)
print("TEST: Tool - generate_image (validation)")
print("=" * 60)
from Imagen_MCP.tools.generate_image import generate_image
try:
# Create a temporary file for the output
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
output_path = f.name
# Test invalid model
result = await generate_image(
prompt="Test",
output_path=output_path,
model="invalid-model-xyz",
)
assert not result.success, "Should fail with invalid model"
assert result.error is not None, "Error should not be None"
assert "Invalid model" in result.error, f"Wrong error: {result.error}"
print(f"✓ Invalid model rejected: {result.error[:50]}...")
# Test invalid size
result = await generate_image(
prompt="Test",
output_path=output_path,
model="imagen-4-fast",
size="9999x9999",
)
assert not result.success, "Should fail with invalid size"
assert result.error is not None, "Error should not be None"
assert "not supported" in result.error.lower(), f"Wrong error: {result.error}"
print(f"✓ Invalid size rejected: {result.error[:50]}...")
# Clean up
Path(output_path).unlink(missing_ok=True)
results.record("tool_generate_image_validation", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("tool_generate_image_validation", False, str(e))
return False
async def test_tool_start_image_batch():
"""Test start_image_batch tool with real API."""
print("\n" + "=" * 60)
print("TEST: Tool - start_image_batch")
print("=" * 60)
from Imagen_MCP.tools.batch_generate import start_image_batch
from Imagen_MCP.services.session_manager import SessionManager
from Imagen_MCP.services.nexos_client import NexosClient
try:
# Create a session manager with real client
client = NexosClient.from_env()
session_manager = SessionManager(client=client)
# Create a temporary directory for batch output
output_dir = tempfile.mkdtemp(prefix="imagen_batch_")
result = await start_image_batch(
prompt="A colorful abstract painting",
output_dir=output_dir,
count=2, # Small count for faster test
model="imagen-4-fast",
size="1024x1024",
session_manager=session_manager,
)
assert result.success, f"Batch start failed: {result.error}"
assert result.session_id is not None, "No session ID returned"
assert result.first_image_path is not None, "No first image path returned"
assert result.first_image_size_bytes is not None, "No first image size returned"
print(f"✓ Session ID: {result.session_id}")
print(f"✓ First image saved to: {result.first_image_path}")
print(f"✓ First image size: {result.first_image_size_bytes} bytes")
print(f"✓ Pending count: {result.pending_count}")
# Store session ID for next tests
global _test_session_id, _test_session_manager, _test_output_dir
_test_session_id = result.session_id
_test_session_manager = session_manager
_test_output_dir = output_dir
results.record("tool_start_image_batch", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("tool_start_image_batch", False, str(e))
return False
async def test_tool_get_batch_status():
"""Test get_batch_status tool."""
print("\n" + "=" * 60)
print("TEST: Tool - get_batch_status")
print("=" * 60)
from Imagen_MCP.tools.batch_generate import get_batch_status
try:
global _test_session_id, _test_session_manager
if not _test_session_id:
results.skip("tool_get_batch_status", "No session from start_image_batch")
return None
result = await get_batch_status(
session_id=_test_session_id,
session_manager=_test_session_manager,
)
assert result.success, f"Status check failed: {result.error}"
assert result.session_id == _test_session_id, "Session ID mismatch"
assert result.status in (
"created",
"generating",
"partial",
"completed",
"failed",
), f"Invalid status: {result.status}"
assert result.total_count == 2, (
f"Expected total_count=2, got {result.total_count}"
)
print(f"✓ Session ID: {result.session_id}")
print(f"✓ Status: {result.status}")
print(f"✓ Completed: {result.completed_count}/{result.total_count}")
print(f"✓ Pending: {result.pending_count}")
results.record("tool_get_batch_status", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("tool_get_batch_status", False, str(e))
return False
async def test_tool_get_next_image():
"""Test get_next_image tool."""
print("\n" + "=" * 60)
print("TEST: Tool - get_next_image")
print("=" * 60)
from Imagen_MCP.tools.batch_generate import get_next_image
try:
global _test_session_id, _test_session_manager, _test_output_dir
if not _test_session_id:
results.skip("tool_get_next_image", "No session from start_image_batch")
return None
# Create output path for the next image
output_path = str(Path(_test_output_dir) / "image_002.png")
# Get the second image (first was returned by start_image_batch)
result = await get_next_image(
session_id=_test_session_id,
output_path=output_path,
timeout=120.0,
session_manager=_test_session_manager,
)
# With count=2, we should get one more image or session should be done
if result.success and result.file_path is not None:
print(f"✓ Second image saved to: {result.file_path}")
print(f"✓ File size: {result.file_size_bytes} bytes")
print(f"✓ Has more: {result.has_more}")
print(f"✓ Pending: {result.pending_count}")
elif result.success and result.file_path is None:
print("✓ No more images (session complete)")
print(f"✓ Has more: {result.has_more}")
else:
# Might timeout if generation is slow
print(f"⚠ Result: {result.error}")
results.record("tool_get_next_image", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("tool_get_next_image", False, str(e))
return False
async def test_tool_get_next_image_invalid_session():
"""Test get_next_image with invalid session."""
print("\n" + "=" * 60)
print("TEST: Tool - get_next_image (invalid session)")
print("=" * 60)
from Imagen_MCP.tools.batch_generate import get_next_image
from Imagen_MCP.services.session_manager import SessionManager
try:
session_manager = SessionManager()
# Create a temporary file for the output
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
output_path = f.name
result = await get_next_image(
session_id="invalid-session-id-xyz",
output_path=output_path,
timeout=1.0,
session_manager=session_manager,
)
# Clean up
Path(output_path).unlink(missing_ok=True)
assert not result.success, "Should fail with invalid session"
assert result.error is not None, "Error should not be None"
assert "not found" in result.error.lower(), f"Wrong error: {result.error}"
print(f"✓ Invalid session rejected: {result.error}")
results.record("tool_get_next_image_invalid", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("tool_get_next_image_invalid", False, str(e))
return False
async def test_tool_get_batch_status_invalid_session():
"""Test get_batch_status with invalid session."""
print("\n" + "=" * 60)
print("TEST: Tool - get_batch_status (invalid session)")
print("=" * 60)
from Imagen_MCP.tools.batch_generate import get_batch_status
from Imagen_MCP.services.session_manager import SessionManager
try:
session_manager = SessionManager()
result = await get_batch_status(
session_id="invalid-session-id-xyz",
session_manager=session_manager,
)
assert not result.success, "Should fail with invalid session"
assert result.error is not None, "Error should not be None"
assert "not found" in result.error.lower(), f"Wrong error: {result.error}"
print(f"✓ Invalid session rejected: {result.error}")
results.record("tool_get_batch_status_invalid", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("tool_get_batch_status_invalid", False, str(e))
return False
# ============================================================================
# MCP SERVER INTEGRATION TESTS
# ============================================================================
def test_mcp_server_tools_registered():
"""Test that all tools are registered in MCP server."""
print("\n" + "=" * 60)
print("TEST: MCP Server - Tools Registration")
print("=" * 60)
try:
from Imagen_MCP.server import mcp
# Get registered tools
# FastMCP stores tools in _tool_manager
tool_names = []
if hasattr(mcp, "_tool_manager") and hasattr(mcp._tool_manager, "_tools"):
tool_names = list(mcp._tool_manager._tools.keys())
expected_tools = [
"generate_image",
"start_image_batch",
"get_next_image",
"get_batch_status",
"list_models",
"get_model_details",
]
for tool in expected_tools:
assert tool in tool_names, f"Tool '{tool}' not registered"
print(f"✓ Tool registered: {tool}")
results.record("mcp_server_tools", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("mcp_server_tools", False, str(e))
return False
def test_mcp_server_resources_registered():
"""Test that all resources are registered in MCP server."""
print("\n" + "=" * 60)
print("TEST: MCP Server - Resources Registration")
print("=" * 60)
try:
from Imagen_MCP.server import mcp
# Get registered resources
resource_uris = []
if hasattr(mcp, "_resource_manager") and hasattr(
mcp._resource_manager, "_resources"
):
resource_uris = list(mcp._resource_manager._resources.keys())
# Check for expected resources (may be stored differently)
print(f"Registered resources: {resource_uris}")
# At minimum, verify the server can be imported and has the mcp object
assert mcp is not None, "MCP server object is None"
assert hasattr(mcp, "run"), "MCP server missing 'run' method"
print("✓ MCP server initialized")
print("✓ Server has 'run' method")
results.record("mcp_server_resources", True)
return True
except Exception as e:
print(f"✗ Error: {e}")
results.record("mcp_server_resources", False, str(e))
return False
# ============================================================================
# MAIN
# ============================================================================
# Global variables for session tests
_test_session_id: str | None = None
_test_session_manager = None
async def main():
"""Run all e2e tests."""
print("=" * 60)
print("IMAGEN MCP SERVER - COMPREHENSIVE E2E TESTS")
print("Testing all tools and resources with real Nexos.ai API")
print("=" * 60)
# Check API key
api_key = os.getenv("NEXOS_API_KEY")
if not api_key:
print("\n✗ ERROR: NEXOS_API_KEY not set in environment")
print("Please set NEXOS_API_KEY in .env file")
return 1
print(f"\nAPI Key: {api_key[:20]}...{api_key[-10:]}")
# Run resource tests (no API calls)
test_resource_model_catalog()
test_resource_model_details()
# Run model listing tools (no API calls)
test_tool_list_models()
test_tool_get_model_details()
# Run MCP server tests
test_mcp_server_tools_registered()
test_mcp_server_resources_registered()
# Run tool validation tests (no API calls)
await test_tool_generate_image_validation()
await test_tool_get_next_image_invalid_session()
await test_tool_get_batch_status_invalid_session()
# Run tool tests with real API
await test_tool_generate_image()
await test_tool_start_image_batch()
await test_tool_get_batch_status()
await test_tool_get_next_image()
# Summary
print("\n" + "=" * 60)
print("TEST SUMMARY")
print("=" * 60)
passed, failed, skipped = results.summary()
for name, result in results.results.items():
if result is True:
status = "✓ PASSED"
elif result is False:
status = f"✗ FAILED: {results.errors.get(name, 'Unknown')}"
else:
status = f"⊘ SKIPPED: {results.errors.get(name, 'Unknown')}"
print(f" {name}: {status}")
print(f"\nTotal: {passed} passed, {failed} failed, {skipped} skipped")
if failed > 0:
print("\n✗ SOME TESTS FAILED")
return 1
else:
print("\n✓ ALL TESTS PASSED")
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)