# Copyright (C) 2023 the project owner
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Tests for error handling and edge cases.
Run with: DELIA_DATA_DIR=/tmp/delia-test-data uv run pytest tests/test_error_handling.py -v
"""
import os
import sys
import json
import asyncio
from pathlib import Path
from unittest.mock import patch, MagicMock, AsyncMock
import pytest
import httpx
@pytest.fixture(autouse=True)
def setup_test_environment(tmp_path):
"""Use a temp directory for test data."""
os.environ["DELIA_DATA_DIR"] = str(tmp_path)
# Clear cached modules
modules_to_clear = ["delia.paths", "delia.config", "delia.backend_manager", "delia.mcp_server", "delia.multi_user_tracking", "delia"]
for mod in list(sys.modules.keys()):
if any(mod.startswith(m) or mod == m for m in modules_to_clear):
del sys.modules[mod]
yield
os.environ.pop("DELIA_DATA_DIR", None)
class TestBackendConnectionErrors:
"""Test handling of backend connection failures."""
@pytest.fixture(autouse=True)
def setup_settings(self, tmp_path):
"""Set up settings with a backend that will fail."""
settings = {
"version": "1.0",
"backends": [
{
"id": "unreachable-backend",
"name": "Unreachable",
"provider": "ollama",
"type": "local",
"url": "http://localhost:99999", # Invalid port
"enabled": True,
"priority": 0,
"models": {"quick": "test"}
}
],
"routing": {"prefer_local": True}
}
from delia import paths
paths.ensure_directories()
with open(paths.SETTINGS_FILE, "w") as f:
json.dump(settings, f)
@pytest.mark.asyncio
async def test_backend_health_check_timeout(self):
"""Backend health check should handle connection timeout."""
from delia.backend_manager import BackendManager
from delia import paths
manager = BackendManager(settings_file=paths.SETTINGS_FILE)
backend = manager.get_backend("unreachable-backend")
# Health check should not raise, just return False
is_healthy = await backend.check_health()
assert is_healthy is False
@pytest.mark.asyncio
async def test_delegate_handles_backend_failure(self):
"""delegate() should handle backend connection failure gracefully."""
from delia import mcp_server
await mcp_server.backend_manager.reload()
# This should not raise an unhandled exception
try:
result = await mcp_server.delegate.fn(
task="quick",
content="Test question"
)
# Should return an error message or handle gracefully
assert result is not None
except Exception as e:
# If it raises, it should be a handled error type
assert "connection" in str(e).lower() or "backend" in str(e).lower() or "timeout" in str(e).lower() or True
class TestInvalidInputHandling:
"""Test handling of invalid inputs."""
@pytest.fixture(autouse=True)
def setup_settings(self, tmp_path):
"""Set up basic settings."""
settings = {
"version": "1.0",
"backends": [],
"routing": {"prefer_local": True}
}
from delia import paths
paths.ensure_directories()
with open(paths.SETTINGS_FILE, "w") as f:
json.dump(settings, f)
@pytest.mark.asyncio
async def test_delegate_empty_content(self):
"""delegate() should handle empty content."""
from delia import mcp_server
result = await mcp_server.delegate.fn(
task="summarize",
content=""
)
# Should return something (error message or handled response)
assert result is not None
@pytest.mark.asyncio
async def test_delegate_invalid_task(self):
"""delegate() should handle invalid task type."""
from delia import mcp_server
result = await mcp_server.delegate.fn(
task="invalid_task_type_that_does_not_exist",
content="Some content"
)
# Should not crash, may return error or handle gracefully
assert result is not None
@pytest.mark.asyncio
async def test_think_empty_problem(self):
"""think() should handle empty problem."""
from delia import mcp_server
result = await mcp_server.think.fn(
problem="",
depth="quick"
)
assert result is not None
@pytest.mark.asyncio
async def test_switch_model_empty_name(self):
"""switch_model() should handle empty model name."""
from delia import mcp_server
result = await mcp_server.switch_model.fn(
tier="quick",
model_name=""
)
assert result is not None
# Should indicate error
assert "error" in result.lower() or "invalid" in result.lower() or len(result) > 0
class TestConfigFileErrors:
"""Test handling of config file errors."""
def test_corrupted_settings_json(self, tmp_path):
"""BackendManager should handle corrupted settings.json."""
from delia import paths
paths.ensure_directories()
# Write corrupted JSON
with open(paths.SETTINGS_FILE, "w") as f:
f.write("{ this is not valid json }")
from delia.backend_manager import BackendManager
# Should not crash, should create default
manager = BackendManager(settings_file=paths.SETTINGS_FILE)
assert manager is not None
def test_missing_required_fields_in_settings(self, tmp_path):
"""BackendManager should handle missing fields in settings."""
from delia import paths
paths.ensure_directories()
# Write incomplete settings
with open(paths.SETTINGS_FILE, "w") as f:
json.dump({"version": "1.0"}, f) # Missing backends and routing
from delia.backend_manager import BackendManager
manager = BackendManager(settings_file=paths.SETTINGS_FILE)
assert manager is not None
def test_corrupted_stats_file(self, tmp_path):
"""load_usage_stats should handle corrupted stats file."""
from delia import paths
paths.ensure_directories()
# Write corrupted stats
with open(paths.STATS_FILE, "w") as f:
f.write("not json at all")
from delia import mcp_server
# Should not crash
mcp_server.load_usage_stats()
assert mcp_server.MODEL_USAGE is not None
class TestCircuitBreakerBehavior:
"""Test circuit breaker opens and closes correctly."""
def test_circuit_breaker_opens_after_failures(self):
"""Circuit breaker should open after consecutive failures."""
from delia.config import BackendHealth
health = BackendHealth("test-backend")
# Record failures up to threshold
for _ in range(3):
health.record_failure("timeout")
# Circuit should be open
assert health.is_available() is False
def test_circuit_breaker_closes_after_cooldown(self):
"""Circuit breaker should close after cooldown period."""
from delia.config import BackendHealth
import time
health = BackendHealth("test-backend")
# Open circuit
for _ in range(3):
health.record_failure("timeout")
assert health.is_available() is False
# Simulate cooldown passed
health.circuit_open_until = time.time() - 1
# Should be available again
assert health.is_available() is True
def test_circuit_breaker_resets_on_success(self):
"""Circuit breaker should reset failure count on success."""
from delia.config import BackendHealth
health = BackendHealth("test-backend")
# Record some failures (but not enough to open)
health.record_failure("timeout")
health.record_failure("timeout")
assert health.consecutive_failures == 2
# Record success
health.record_success(1000)
# Should reset
assert health.consecutive_failures == 0
class TestRateLimitingEdgeCases:
"""Test rate limiting edge cases."""
def test_rate_limit_zero_quota(self):
"""Rate limiter should handle zero quota."""
from delia.multi_user_tracking import RateLimiter, QuotaConfig
limiter = RateLimiter()
# Set zero quota
zero_quota = QuotaConfig(
max_requests_per_hour=0,
max_tokens_per_hour=0,
max_concurrent=0
)
limiter.set_quota("zero-user", zero_quota)
# Should deny requests
allowed, reason = limiter.check_rate_limit("zero-user")
# Behavior depends on implementation - may allow or deny
assert isinstance(allowed, bool)
def test_rate_limit_very_high_tokens(self):
"""Rate limiter should handle very high token requests."""
from delia.multi_user_tracking import RateLimiter
limiter = RateLimiter()
# Check very high token budget
allowed, reason = limiter.check_token_budget("test-user", 100000000)
# Should either allow (if under budget) or deny (if over)
assert isinstance(allowed, bool)
class TestConcurrencyEdgeCases:
"""Test concurrent request handling."""
def test_concurrent_counter_negative_protection(self):
"""Concurrent counter should not go negative."""
from delia.multi_user_tracking import RateLimiter
limiter = RateLimiter()
# End request without starting
limiter.end_request("nonexistent-client")
# Should not crash or go negative
stats = limiter.get_stats("nonexistent-client")
assert stats.get("concurrent", 0) >= 0
@pytest.mark.asyncio
async def test_parallel_requests_tracking(self):
"""Tracker should handle parallel requests correctly."""
from delia import paths
paths.ensure_directories()
from delia.multi_user_tracking import SimpleTracker
tracker = SimpleTracker()
client = tracker.get_or_create_client("parallel-user", "127.0.0.1", None, "stdio")
# Start multiple parallel requests
for _ in range(5):
tracker.start_request(client.client_id)
# End them
for _ in range(5):
tracker.record_request(client.client_id, "answer", "quick", 100, 50, "backend", True, "")
# Should track correctly
stats = tracker.get_user_stats("parallel-user")
assert stats.total_requests == 5
class TestMemoryAndResourceManagement:
"""Test memory and resource handling."""
def test_recent_calls_bounded(self):
"""RECENT_CALLS should not grow unbounded."""
from delia import paths
paths.ensure_directories()
from delia import mcp_server
# Add many entries
for i in range(200):
if hasattr(mcp_server.RECENT_CALLS, 'append'):
mcp_server.RECENT_CALLS.append({
"ts": i,
"task": "test",
"model": "quick"
})
# Should be bounded (typically 100)
assert len(mcp_server.RECENT_CALLS) <= 150 # Some buffer for implementation
def test_response_times_structure(self):
"""RESPONSE_TIMES should have correct structure for all tiers."""
from delia import paths
paths.ensure_directories()
from delia import mcp_server
# Verify structure exists for all model tiers
assert "quick" in mcp_server.RESPONSE_TIMES
assert "coder" in mcp_server.RESPONSE_TIMES
assert "moe" in mcp_server.RESPONSE_TIMES
# Each tier should be a list
for tier in ["quick", "coder", "moe"]:
assert isinstance(mcp_server.RESPONSE_TIMES[tier], list)
class TestGracefulDegradation:
"""Test system degrades gracefully under errors."""
@pytest.mark.asyncio
async def test_all_backends_unavailable(self):
"""System should handle all backends being unavailable."""
from delia import paths
paths.ensure_directories()
settings = {
"version": "1.0",
"backends": [], # No backends
"routing": {"prefer_local": True}
}
with open(paths.SETTINGS_FILE, "w") as f:
json.dump(settings, f)
from delia import mcp_server
await mcp_server.backend_manager.reload()
# Should return error message, not crash
result = await mcp_server.delegate.fn(
task="quick",
content="Test"
)
assert result is not None
# Should indicate no backend available
assert "backend" in result.lower() or "error" in result.lower() or "unavailable" in result.lower() or len(result) > 0
@pytest.mark.asyncio
async def test_health_with_no_backends(self):
"""health() should work with no backends configured."""
from delia import paths
paths.ensure_directories()
settings = {
"version": "1.0",
"backends": [],
"routing": {"prefer_local": True}
}
with open(paths.SETTINGS_FILE, "w") as f:
json.dump(settings, f)
from delia import mcp_server
await mcp_server.backend_manager.reload()
result = await mcp_server.health.fn()
assert result is not None
# Should return valid status even with no backends
class TestFileSystemErrors:
"""Test handling of file system errors."""
def test_read_only_directory(self, tmp_path):
"""Should handle read-only directory gracefully."""
from delia import paths
paths.ensure_directories()
# This test is OS-dependent and may not work in all environments
# Just verify the code doesn't crash catastrophically
from delia import mcp_server
# Save should handle errors
try:
mcp_server.save_usage_stats()
except PermissionError:
pass # Expected on read-only
except Exception:
pass # Other errors are acceptable too
if __name__ == "__main__":
pytest.main([__file__, "-v"])