import os
import sys
import pytest
import anyio
import json
from pathlib import Path
from mcp_stata.stata_client import StataClient
from mcp_stata import discovery
pytestmark = [pytest.mark.requires_stata]
@pytest.fixture
def clean_client():
"""Returns a freshly initialized StataClient."""
# Ensure fresh discovery for each test in this suite
from mcp_stata import stata_client
stata_client._discovery_result = None
stata_client._discovery_candidates = None
stata_client._discovery_attempted = False
client = StataClient()
# Skip preflight to speed up
os.environ["MCP_STATA_SKIP_PREFLIGHT"] = "1"
return client
def test_pystata_trap_protection_e2e(tmp_path, monkeypatch, clean_client):
"""
Simulates a 'trap' where a broken pystata exists in site-packages,
and verifies that StataClient successfully bypasses it using sys.path prioritization.
"""
# 1. Create a fake pystata module that is "bad"
trap_dir = tmp_path / "site-packages-trap"
trap_dir.mkdir()
pystata_fake = trap_dir / "pystata"
pystata_fake.mkdir()
(pystata_fake / "__init__.py").write_text("raise ImportError('STATA_PYPI_TRAP_TRIGGERED')")
# Prepend to sys.path to ensure it's found first by standard imports
monkeypatch.syspath_prepend(str(trap_dir))
# 2. Verify it's actually trapped for a normal import
with pytest.raises(ImportError, match="STATA_PYPI_TRAP_TRIGGERED"):
import pystata
import importlib
importlib.reload(pystata)
# 3. Initialize StataClient
# It should succeed because it inserts the REAL utilities path at the head of sys.path[0]
try:
clean_client.init()
except Exception as e:
pytest.fail(f"StataClient failed to initialize despite trap protection: {e}")
# 4. Verify pystata is now healthy in the current process
try:
import pystata
# Check that it's the real one (from Stata install, not our trap)
assert "STATA_PYPI_TRAP_TRIGGERED" not in pystata.__file__
# In a real Stata install, it's either in 'utilities' or a '.app' bundle
# Our trap is in 'site-packages-trap'
assert "site-packages-trap" not in pystata.__file__
except ImportError as e:
pytest.fail(f"pystata should be importable after initialization: {e}")
def test_large_data_efficiency_e2e(clean_client):
"""
Test that get_data handles large datasets efficiently (using the new slicing optimization).
"""
clean_client.init()
# Create a reasonably large dataset (10k rows)
clean_client.run_command_structured("set obs 10000", echo=False)
clean_client.run_command_structured("gen id = _n", echo=False)
clean_client.run_command_structured("gen data = runiform()", echo=False)
# Fetch a slice from the middle
start_idx = 5000
count = 10
data = clean_client.get_data(start=start_idx, count=count)
assert len(data) == count
# 0-indexed start=5000 means Stata _n=5001
assert data[0]["id"] == 5001
assert data[-1]["id"] == 5010
def test_get_data_out_of_bounds(clean_client):
"""Test get_data with indices exceeding dataset size."""
clean_client.init()
clean_client.run_command_structured("sysuse auto, clear", echo=False)
total_obs = 74 # auto.dta has 74 observations
# Start way past end
data = clean_client.get_data(start=100, count=10)
assert data == []
# Start just before end, count overlaps
data = clean_client.get_data(start=70, count=10)
assert len(data) == 4 # 71, 72, 73, 74
assert data[-1]["price"] is not None
def test_initialization_failure_diagnostic(monkeypatch, clean_client):
"""Verify the diagnostic message when Stata candidate is invalid."""
# Mock discovery to return a non-existent path
monkeypatch.setattr("mcp_stata.stata_client._get_discovery_candidates", lambda: [("/non/existent/stata", "mp")])
with pytest.raises(RuntimeError) as excinfo:
clean_client.init()
assert "proprietary 'pystata' module" in str(excinfo.value)
assert "PyPI" in str(excinfo.value)
@pytest.mark.asyncio
async def test_server_tool_load_data_heuristic(clean_client):
"""Test the load_data tool heuristic with various inputs."""
from mcp_stata.server import load_data
# Note: load_data uses the global mcp state/stata_client,
# but for integration tests we can just call it.
# In these tests, stata_client fixture from conftest is usually used.
# We'll use a local instance for isolation if needed, but server.py
# uses a global StataClient. This is why we have tests/test_server.py.
pass # Already covered in test_server.py but good to keep in mind
def test_spaces_in_path_initialization(tmp_path, clean_client, monkeypatch):
"""
Test that StataClient can initialize even if the directory has spaces.
(Simulated by symlinking/moving Stata or just mocking the path).
"""
# Discover real stata
real_path, edition = discovery.find_stata_path()
# Create a path with spaces
space_dir = tmp_path / "Stata Now 19"
space_dir.mkdir()
# On macOS, we'd need a complex structure.
# Let's just mock the candidate and see if stata_setup.config handles the string.
# In reality, we don't need to move the binary, just check if our
# path climbing and repr() logic works.
# The fix I made:
# stata_setup.config({repr(path)}, {repr(edition)})
# ensures spaces are handled in the subprocess.
pass
@pytest.mark.asyncio
async def test_error_capture_with_log_rotation(clean_client, tmp_path):
"""Test that errors are captured correctly even if multiple commands are run."""
clean_client.init()
# Success
resp1 = clean_client.run_command_structured("display \"hello\"", echo=True)
assert resp1.success
# Failure
resp2 = clean_client.run_command_structured("thisisnotacommand", echo=True)
assert not resp2.success
assert resp2.rc != 0
assert "unrecognized" in resp2.error.message.lower()
# Success again
resp3 = clean_client.run_command_structured("display 2+2", echo=True)
assert resp3.success
assert "4" in resp3.stdout