#!/usr/bin/env python3
"""
Test script for Nanonets MCP Server OCR functionality
"""
import asyncio
import json
import time
import requests
import base64
from pathlib import Path
async def test_mcp_tools():
"""Test the MCP tools functionality locally"""
try:
import sys
sys.path.append('/home/arne/src/nanonets_mcp')
from nanonets_mcp.server import mcp
print("π Testing MCP tools locally...")
# Test 1: List available tools
tools = await mcp.list_tools()
print(f"β
Available tools: {[tool.name for tool in tools]}")
# Test 2: Get supported formats
formats = await mcp.call_tool('get_supported_formats', {})
print(f"β
Supported formats: {len(formats)} items")
# Test 3: Load test image
test_image_path = Path('/home/arne/src/nanonets_mcp/tests/research_document_b64.txt')
if test_image_path.exists():
with open(test_image_path, 'r') as f:
image_b64 = f.read()
print(f"π· Loaded test image: {len(image_b64)} characters")
# Test 4: OCR processing
print("π€ Starting OCR processing...")
start_time = time.time()
result = await mcp.call_tool('ocr_image_to_markdown', {
'image_data': image_b64,
'image_format': 'png'
})
end_time = time.time()
processing_time = end_time - start_time
print(f"β
OCR completed in {processing_time:.2f} seconds")
print("π OCR Result:")
print("=" * 80)
# Extract result
if hasattr(result, '__iter__') and len(result) >= 1:
content = result[0]
if hasattr(content, 'text'):
ocr_text = content.text
else:
ocr_text = str(content)
else:
ocr_text = str(result)
print(ocr_text)
print("=" * 80)
# Save result
with open('/home/arne/src/nanonets_mcp/tests/ocr_result.txt', 'w') as f:
f.write(f"OCR Processing Time: {processing_time:.2f} seconds\\n")
f.write(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}\\n")
f.write("=" * 80 + "\\n")
f.write(ocr_text)
print(f"πΎ Result saved to: /home/arne/src/nanonets_mcp/tests/ocr_result.txt")
return True
else:
print(f"β Test image not found: {test_image_path}")
return False
except Exception as e:
print(f"β Error: {e}")
import traceback
traceback.print_exc()
return False
def test_http_server():
"""Test the HTTP server endpoints"""
print("π Testing HTTP server...")
try:
# Test basic connectivity
response = requests.get('http://localhost:8005', timeout=5)
print(f"β
Server responding: {response.status_code}")
# Test SSE endpoint (just check if it responds)
try:
response = requests.get('http://localhost:8005/sse', timeout=2, stream=True)
print(f"β
SSE endpoint responding: {response.status_code}")
except requests.exceptions.ReadTimeout:
print("β
SSE endpoint streaming (timeout expected)")
return True
except Exception as e:
print(f"β HTTP server error: {e}")
return False
async def main():
"""Main test function"""
print("π§ͺ Nanonets MCP Server Test Suite")
print("=" * 50)
# Test 1: HTTP Server
http_ok = test_http_server()
# Test 2: MCP Tools
mcp_ok = await test_mcp_tools()
print("\\nπ Test Results:")
print("=" * 50)
print(f"HTTP Server: {'β
PASS' if http_ok else 'β FAIL'}")
print(f"MCP Tools: {'β
PASS' if mcp_ok else 'β FAIL'}")
if http_ok and mcp_ok:
print("\\nπ ALL TESTS PASSED! Nanonets MCP Server is fully functional!")
else:
print("\\nβ οΈ Some tests failed. Check the logs above.")
if __name__ == "__main__":
asyncio.run(main())