#!/usr/bin/env python3
"""
Test suite for content_operations tool.
Validates all 4 operations and their parameters.
"""
import asyncio
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from tests.mcp_client import create_client
async def test_retrieve_operation_markdown():
"""Test retrieve operation with markdown extraction."""
async with create_client() as client:
result = await client.call_tool(
"content_operations",
{
"operation": "retrieve",
"url": "https://www.python.org",
"extraction_method": "markdown",
},
)
assert result.content, "No content returned"
output = result.content[0].text
assert len(output) > 100, f"Retrieved content too short: {len(output)} chars"
print(f"✅ Retrieve (markdown) test passed - {len(output)} chars")
async def test_retrieve_operation_text():
"""Test retrieve operation with text extraction."""
async with create_client() as client:
result = await client.call_tool(
"content_operations",
{"operation": "retrieve", "url": "https://www.python.org", "extraction_method": "text"},
)
output = result.content[0].text
assert len(output) > 100, f"Text extraction too short: {len(output)} chars"
print("✅ Retrieve (text) test passed")
async def test_retrieve_operation_auto():
"""Test retrieve operation with auto extraction."""
async with create_client() as client:
result = await client.call_tool(
"content_operations",
{
"operation": "retrieve",
"url": "https://www.python.org/about",
"extraction_method": "auto",
},
)
output = result.content[0].text
# Some sites may fail - verify tool handles gracefully
if len(output) < 50:
print(" ⚠️ Site may have blocked request - tool handles gracefully")
assert len(output) >= 0, "Should return empty string on failure, not error"
print("✅ Retrieve (auto) test passed (graceful handling)")
else:
print(f"✅ Retrieve (auto) test passed - {len(output)} chars")
async def test_extract_all_links():
"""Test extract operation with all link types."""
async with create_client() as client:
result = await client.call_tool(
"content_operations",
{
"operation": "extract",
"url": "https://github.com",
"link_type": "all",
"max_links": 20,
},
)
output = result.content[0].text
assert "link" in output.lower() or "extract" in output.lower()
assert len(output) > 200, f"Extract output too minimal: {len(output)} chars"
print("✅ Extract (all links) test passed")
async def test_extract_internal_links():
"""Test extract operation with internal links only."""
async with create_client() as client:
result = await client.call_tool(
"content_operations",
{
"operation": "extract",
"url": "https://www.python.org",
"link_type": "internal",
"max_links": 10,
},
)
output = result.content[0].text
assert "internal" in output.lower() or "link" in output.lower()
assert len(output) > 100, "Internal links extraction too minimal"
print("✅ Extract (internal links) test passed")
async def test_extract_external_links():
"""Test extract operation with external links only."""
async with create_client() as client:
result = await client.call_tool(
"content_operations",
{
"operation": "extract",
"url": "https://www.python.org",
"link_type": "external",
"max_links": 10,
},
)
output = result.content[0].text
assert "external" in output.lower() or "link" in output.lower()
print("✅ Extract (external links) test passed")
async def test_analyze_general():
"""Test analyze operation with general analysis type."""
async with create_client() as client:
test_content = "This is important content for testing. The key finding is that analysis works well. Critical information is processed correctly."
result = await client.call_tool(
"content_operations",
{
"operation": "analyze",
"content": test_content,
"analysis_type": "general",
"extract_key_points": True,
"summarize": True,
},
)
output = result.content[0].text
assert len(output) > 100, f"Analysis output too minimal: {len(output)} chars"
print("✅ Analyze (general) test passed")
async def test_analyze_sentiment():
"""Test analyze operation with sentiment analysis type."""
async with create_client() as client:
test_content = "This is wonderful and amazing content. Everything is great and excellent. Positive outcomes are expected."
result = await client.call_tool(
"content_operations",
{
"operation": "analyze",
"content": test_content,
"analysis_type": "sentiment",
"extract_key_points": False,
"summarize": True,
},
)
output = result.content[0].text
assert len(output) > 50, "Sentiment analysis output too minimal"
print("✅ Analyze (sentiment) test passed")
async def test_analyze_technical():
"""Test analyze operation with technical analysis type."""
async with create_client() as client:
test_content = "The API endpoint returns JSON data. HTTP requests use REST architecture. The system processes data at 1000 req/s."
result = await client.call_tool(
"content_operations",
{
"operation": "analyze",
"content": test_content,
"analysis_type": "technical",
"extract_key_points": True,
},
)
output = result.content[0].text
assert len(output) > 50, "Technical analysis output too minimal"
print("✅ Analyze (technical) test passed")
async def test_stream_operation():
"""Test stream operation."""
async with create_client() as client:
result = await client.call_tool(
"content_operations", {"operation": "stream", "url": "https://www.python.org"}
)
output = result.content[0].text
assert len(output) > 50, f"Stream output too short: {len(output)} chars"
print("✅ Stream operation test passed")
if __name__ == "__main__":
asyncio.run(test_retrieve_operation_markdown())
asyncio.run(test_retrieve_operation_text())
asyncio.run(test_retrieve_operation_auto())
asyncio.run(test_extract_all_links())
asyncio.run(test_extract_internal_links())
asyncio.run(test_extract_external_links())
asyncio.run(test_analyze_general())
asyncio.run(test_analyze_sentiment())
asyncio.run(test_analyze_technical())
asyncio.run(test_stream_operation())
print("\n✅ All content_operations tests passed!")