test_integration.py•4.97 kB
"""Integration tests for MCP CopyQ Server.
These tests require a running CopyQ instance.
Skip them in CI or when CopyQ is not available.
"""
import pytest
import asyncio
import os
# Skip all tests if COPYQ_INTEGRATION env var is not set
pytestmark = pytest.mark.skipif(
os.environ.get("COPYQ_INTEGRATION") != "1",
reason="Integration tests require COPYQ_INTEGRATION=1 and running CopyQ"
)
class TestCopyQClientIntegration:
"""Integration tests for CopyQ client."""
@pytest.mark.asyncio
async def test_list_tabs(self):
"""Test listing tabs from real CopyQ."""
from mcp_copyq.copyq_client import CopyQClient
client = CopyQClient()
tabs = await client.list_tabs()
assert isinstance(tabs, list)
# Should have at least mcp/ tabs if set up correctly
print(f"Found tabs: {tabs}")
@pytest.mark.asyncio
async def test_create_and_delete_tab(self):
"""Test creating and deleting a test tab."""
from mcp_copyq.copyq_client import CopyQClient
client = CopyQClient()
test_tab = "workspace/test_integration"
# Create tab
await client.create_tab(test_tab)
assert await client.tab_exists(test_tab)
# Delete tab
await client.remove_tab(test_tab)
# Note: tab might still show up briefly, wait a bit
await asyncio.sleep(0.5)
@pytest.mark.asyncio
async def test_add_read_delete_item(self):
"""Test full item lifecycle."""
from mcp_copyq.copyq_client import CopyQClient
client = CopyQClient()
test_tab = "workspace/test_integration"
try:
# Create tab
await client.create_tab(test_tab)
# Add item
index = await client.add_item(
test_tab,
"Test item from integration test",
["test", "integration"],
"Test note"
)
assert index == 0
# Read item
item = await client.read_item_full(test_tab, 0)
assert "Test item" in item["text"]
assert "test" in item["tags"]
assert "Test note" in item["note"]
# Delete item
await client.remove_item(test_tab, 0)
count = await client.get_count(test_tab)
assert count == 0
finally:
# Cleanup
try:
await client.remove_tab(test_tab)
except Exception:
pass
class TestMcpReadIntegration:
"""Integration tests for mcp_read."""
@pytest.mark.asyncio
async def test_read_tree(self):
"""Test tree read on real CopyQ."""
from mcp_copyq.tools.read import mcp_read
result = await mcp_read(mode="tree", max_depth=2, max_items=3)
print(f"Tree result:\n{result}")
# Should not be an error
assert "error" not in result or "TAB_NOT_FOUND" not in result
class TestMcpWriteIntegration:
"""Integration tests for mcp_write."""
@pytest.mark.asyncio
async def test_add_update_delete_cycle(self):
"""Test full write cycle on real CopyQ."""
from mcp_copyq.tools.write import mcp_write
from mcp_copyq.tools.read import mcp_read
test_tab = "workspace/test_mcp_write"
try:
# Create tab
result = await mcp_write(mode="tab_create", path=test_tab)
print(f"Create tab: {result}")
# Add item
result = await mcp_write(
mode="add",
tab=test_tab,
text="Integration test item",
tags=["test"]
)
print(f"Add item: {result}")
assert "ok" in result
# Read to verify
result = await mcp_read(mode="item", tab=test_tab, index=0)
print(f"Read item: {result}")
assert "Integration test item" in result
# Update text
result = await mcp_write(
mode="update",
tab=test_tab,
index=0,
field="text",
edit_mode="append",
text=" - updated"
)
print(f"Update: {result}")
assert "ok" in result
# Verify update
result = await mcp_read(mode="item", tab=test_tab, index=0)
print(f"After update: {result}")
assert "updated" in result
# Delete item
result = await mcp_write(mode="delete", tab=test_tab, index=0)
print(f"Delete: {result}")
assert "ok" in result
finally:
# Cleanup
try:
await mcp_write(mode="tab_delete", path=test_tab)
except Exception:
pass
if __name__ == "__main__":
# Run integration tests manually
os.environ["COPYQ_INTEGRATION"] = "1"
pytest.main([__file__, "-v", "-s"])