Unstructured API MCP Server

Official
import os import pytest import tempfile import boto3 import asyncio import sys from pathlib import Path from unittest.mock import patch, MagicMock, AsyncMock from typing import Dict, Any import time # Add the project root to the Python path so that imports work correctly in tests project_root = str(Path(__file__).parent.parent.absolute()) sys.path.insert(0, project_root) from connectors.external.firecrawl import ( _ensure_valid_s3_uri, _upload_directory_to_s3, _invoke_firecrawl_job, wait_for_job_completion, _check_job_status, _process_crawlhtml_results, _process_llmtxt_results, wait_for_crawlhtml_completion, invoke_firecrawl_crawlhtml, check_crawlhtml_status, invoke_firecrawl_llmtxt, check_llmtxt_status, Firecrawl_JobType, cancel_crawlhtml_job, cancel_llmtxt_job, _cancel_job, ) # Moved from conftest.py - Environment fixture @pytest.fixture def mock_environment(): """Fixture to set up environment variables for testing.""" original_env = os.environ.copy() test_env = { "FIRECRAWL_API_KEY": "test-api-key", "AWS_KEY": "test-aws-key", "AWS_SECRET": "test-aws-secret" } # Add the test environment variables for key, value in test_env.items(): os.environ[key] = value yield test_env # Restore original environment os.environ.clear() os.environ.update(original_env) # Mock S3 client for testing uploads @pytest.fixture def mock_s3_client(): """Create a mock of boto3 S3 client.""" with patch('boto3.client') as mock_client: mock_s3 = MagicMock() mock_client.return_value = mock_s3 yield mock_s3 # Test _ensure_valid_s3_uri def test_ensure_valid_s3_uri_valid_input(): """Test that valid S3 URIs are accepted and normalized.""" # Test with already valid URI assert _ensure_valid_s3_uri("s3://bucket/path/") == "s3://bucket/path/" # Test with URI missing trailing slash assert _ensure_valid_s3_uri("s3://bucket/path") == "s3://bucket/path/" # Test with simple bucket URI assert _ensure_valid_s3_uri("s3://bucket") == "s3://bucket/" def test_ensure_valid_s3_uri_invalid_input(): """Test that invalid S3 URIs raise appropriate errors.""" # Test with empty string with pytest.raises(ValueError, match="S3 URI is required"): _ensure_valid_s3_uri("") # Test with non-S3 URI with pytest.raises(ValueError, match="S3 URI must start with 's3://'"): _ensure_valid_s3_uri("http://example.com") # Test with None with pytest.raises(ValueError, match="S3 URI is required"): _ensure_valid_s3_uri(None) # Test _upload_directory_to_s3 def test_upload_directory_to_s3(mock_s3_client, mock_environment): """Test uploading a directory to S3.""" # Create a temporary directory with some test files with tempfile.TemporaryDirectory() as temp_dir: # Create a couple of test files test_file_1 = os.path.join(temp_dir, "test1.txt") test_file_2 = os.path.join(temp_dir, "test2.txt") with open(test_file_1, "w") as f: f.write("Test content 1") with open(test_file_2, "w") as f: f.write("Test content 2") # Call the function (mock_environment fixture already sets up the environment variables) s3_uri = "s3://test-bucket/prefix/" result = _upload_directory_to_s3(temp_dir, s3_uri) # Verify S3 client was called correctly assert mock_s3_client.upload_file.call_count == 2 # Verify result statistics assert result["uploaded_files"] == 2 assert result["failed_files"] == 0 assert result["total_bytes"] > 0 def test_upload_directory_to_s3_with_errors(mock_s3_client, mock_environment): """Test handling errors during S3 upload.""" # Setup mock to raise an exception on upload mock_s3_client.upload_file.side_effect = Exception("Mock S3 error") with tempfile.TemporaryDirectory() as temp_dir: # Create a test file test_file = os.path.join(temp_dir, "test.txt") with open(test_file, "w") as f: f.write("Test content") # Call the function (mock_environment fixture already sets up the environment variables) s3_uri = "s3://test-bucket/prefix/" result = _upload_directory_to_s3(temp_dir, s3_uri) # Verify result statistics reflect the failure assert result["uploaded_files"] == 0 assert result["failed_files"] == 1 @pytest.mark.asyncio async def test_check_crawlhtml_status(mock_environment): """Test checking the status of a Firecrawl HTML crawl job.""" # Mock _check_job_status function with patch('connectors.external.firecrawl._check_job_status') as mock_check_job: mock_check_job.return_value = { "id": "test-id", "status": "completed", "completed_urls": 10, "total_urls": 10 } # Call the function result = await check_crawlhtml_status("test-id") # Verify results assert result["id"] == "test-id" assert result["status"] == "completed" assert result["completed_urls"] == 10 assert result["total_urls"] == 10 # Verify _check_job_status was called with the correct job type mock_check_job.assert_awaited_once_with("test-id", "crawlhtml") @pytest.mark.asyncio async def test_check_llmtxt_status(mock_environment): """Test checking the status of an LLM text generation job.""" # Mock _check_job_status function with patch('connectors.external.firecrawl._check_job_status') as mock_check_job: mock_check_job.return_value = { "id": "test-id", "status": "completed", "llmfulltxt": "Generated text content..." } # Call the function result = await check_llmtxt_status("test-id") # Verify results assert result["id"] == "test-id" assert result["status"] == "completed" assert "llmfulltxt" in result # Verify _check_job_status was called with the correct job type mock_check_job.assert_awaited_once_with("test-id", "llmfulltxt") @pytest.mark.asyncio async def test_check_job_status_crawlhtml(mock_environment): """Test generic function for checking job status - crawlhtml type.""" # Mock FirecrawlApp with patch('connectors.external.firecrawl.FirecrawlApp') as MockFirecrawlApp: mock_firecrawl = MagicMock() mock_firecrawl.check_crawl_status.return_value = { "status": "completed", "completed": 10, "total": 10 } MockFirecrawlApp.return_value = mock_firecrawl # Call the function result = await _check_job_status("test-id", "crawlhtml") # Verify results assert result["id"] == "test-id" assert result["status"] == "completed" assert result["completed_urls"] == 10 assert result["total_urls"] == 10 # Verify correct FirecrawlApp method was called mock_firecrawl.check_crawl_status.assert_called_once_with("test-id") @pytest.mark.asyncio async def test_check_job_status_llmtxt(mock_environment): """Test generic function for checking job status - llmtxt type.""" # Mock FirecrawlApp with patch('connectors.external.firecrawl.FirecrawlApp') as MockFirecrawlApp: mock_firecrawl = MagicMock() mock_firecrawl.check_generate_llms_text_status.return_value = { "status": "completed", "data": { "llmsfulltxt": "Generated text content...", "processedUrls": ["https://example.com/1", "https://example.com/2"] } } MockFirecrawlApp.return_value = mock_firecrawl # Call the function result = await _check_job_status("test-id", "llmfulltxt") # Verify results assert result["id"] == "test-id" assert result["status"] == "completed" assert "llmfulltxt" in result assert result["llmfulltxt"] == "Generated text content..." # Verify correct FirecrawlApp method was called mock_firecrawl.check_generate_llms_text_status.assert_called_once_with("test-id") @pytest.mark.asyncio async def test_check_job_status_invalid_type(mock_environment): """Test generic function for checking job status with invalid job type.""" # Call the function with invalid job type result = await _check_job_status("test-id", "invalid_type") # Verify error response assert "error" in result assert "Unknown job type" in result["error"] @pytest.mark.asyncio async def test_invoke_firecrawl_crawlhtml(mock_environment): """Test invoking a Firecrawl HTML crawl job.""" # Mock _invoke_firecrawl_job with patch('connectors.external.firecrawl._invoke_firecrawl_job') as mock_invoke: mock_invoke.return_value = { "id": "test-id", "status": "started", "s3_uri": "s3://test-bucket/test-id/", "message": "Firecrawl crawlhtml job started and will be automatically processed when complete" } # Call the function result = await invoke_firecrawl_crawlhtml( url="https://example.com", s3_uri="s3://test-bucket/" ) # Verify results assert result["id"] == "test-id" assert result["status"] == "started" assert result["s3_uri"] == "s3://test-bucket/test-id/" # Verify _invoke_firecrawl_job was called with the correct parameters mock_invoke.assert_awaited_once() call_args = mock_invoke.call_args[1] assert call_args["url"] == "https://example.com" assert call_args["s3_uri"] == "s3://test-bucket/" assert call_args["job_type"] == "crawlhtml" assert "limit" in call_args["job_params"] assert call_args["job_params"]["limit"] == 100 @pytest.mark.asyncio async def test_invoke_firecrawl_llmtxt(mock_environment): """Test invoking an LLM text generation job.""" # Mock _invoke_firecrawl_job with patch('connectors.external.firecrawl._invoke_firecrawl_job') as mock_invoke: mock_invoke.return_value = { "id": "test-id", "status": "started", "s3_uri": "s3://test-bucket/test-id/", "message": "Firecrawl llmfulltxt job started and will be automatically processed when complete" } # Call the function result = await invoke_firecrawl_llmtxt( url="https://example.com", s3_uri="s3://test-bucket/", max_urls=5 ) # Verify results assert result["id"] == "test-id" assert result["status"] == "started" assert result["s3_uri"] == "s3://test-bucket/test-id/" # Verify _invoke_firecrawl_job was called with the correct parameters mock_invoke.assert_awaited_once() call_args = mock_invoke.call_args[1] assert call_args["url"] == "https://example.com" assert call_args["s3_uri"] == "s3://test-bucket/" assert call_args["job_type"] == "llmfulltxt" assert "maxUrls" in call_args["job_params"] assert call_args["job_params"]["maxUrls"] == 5 assert call_args["job_params"]["showFullText"] is True @pytest.mark.asyncio async def test_invoke_firecrawl_job_crawlhtml(mock_environment): """Test generic function for invoking a Firecrawl job - crawlhtml type.""" # Mock FirecrawlApp with patch('connectors.external.firecrawl.FirecrawlApp') as MockFirecrawlApp: mock_firecrawl = MagicMock() mock_firecrawl.async_crawl_url.return_value = { "id": "test-id", "status": "started" } MockFirecrawlApp.return_value = mock_firecrawl # Mock asyncio.create_task with patch('asyncio.create_task') as mock_create_task: # Call the function result = await _invoke_firecrawl_job( url="https://example.com", s3_uri="s3://test-bucket/", job_type="crawlhtml", job_params={"limit": 100} ) # Verify results assert result["id"] == "test-id" assert result["status"] == "started" assert result["s3_uri"] == "s3://test-bucket/test-id/" # Verify correct FirecrawlApp method was called mock_firecrawl.async_crawl_url.assert_called_once_with( "https://example.com", params={"limit": 100} ) # Verify background task was created mock_create_task.assert_called_once() @pytest.mark.asyncio async def test_invoke_firecrawl_job_llmtxt(mock_environment): """Test generic function for invoking a Firecrawl job - llmtxt type.""" # Mock FirecrawlApp with patch('connectors.external.firecrawl.FirecrawlApp') as MockFirecrawlApp: mock_firecrawl = MagicMock() mock_firecrawl.async_generate_llms_text.return_value = { "id": "test-id", "status": "started" } MockFirecrawlApp.return_value = mock_firecrawl # Mock asyncio.create_task with patch('asyncio.create_task') as mock_create_task: # Call the function result = await _invoke_firecrawl_job( url="https://example.com", s3_uri="s3://test-bucket/", job_type="llmfulltxt", job_params={"maxUrls": 5, "showFullText": True} ) # Verify results assert result["id"] == "test-id" assert result["status"] == "started" assert result["s3_uri"] == "s3://test-bucket/test-id/" # Verify correct FirecrawlApp method was called mock_firecrawl.async_generate_llms_text.assert_called_once_with( "https://example.com", params={"maxUrls": 5, "showFullText": True} ) # Verify background task was created mock_create_task.assert_called_once() @pytest.mark.asyncio async def test_invoke_firecrawl_job_invalid_type(mock_environment): """Test generic function for invoking a Firecrawl job with invalid job type.""" # Call the function with invalid job type result = await _invoke_firecrawl_job( url="https://example.com", s3_uri="s3://test-bucket/", job_type="invalid_type", job_params={} ) # Verify error response assert "error" in result assert "Unknown job type" in result["error"] @pytest.mark.asyncio async def test_wait_for_crawlhtml_completion(mock_environment): """Test waiting for a Firecrawl HTML crawl job to complete.""" # Mock wait_for_job_completion function with patch('connectors.external.firecrawl.wait_for_job_completion') as mock_wait: mock_wait.return_value = { "id": "test-id", "status": "completed", "s3_uri": "s3://test-bucket/test-id/", "file_count": 10, "uploaded_files": 10, "failed_uploads": 0, "upload_size_bytes": 1000, "elapsed_time": 60, "completed_urls": 10, "total_urls": 10 } # Call the function result = await wait_for_crawlhtml_completion( crawl_id="test-id", s3_uri="s3://test-bucket/", poll_interval=10, timeout=300 ) # Verify results assert result["id"] == "test-id" assert result["status"] == "completed" assert result["s3_uri"] == "s3://test-bucket/test-id/" # Verify wait_for_job_completion was called with the correct parameters mock_wait.assert_awaited_once_with( "test-id", "s3://test-bucket/", "crawlhtml", 10, 300 ) @pytest.mark.asyncio async def test_wait_for_job_completion_crawlhtml(mock_environment): """Test waiting for a job to complete - crawlhtml type.""" # Mock FirecrawlApp with patch('connectors.external.firecrawl.FirecrawlApp') as MockFirecrawlApp: mock_firecrawl = MagicMock() mock_firecrawl.check_crawl_status.return_value = { "status": "completed", "completed": 10, "total": 10, "data": [ { "html": "<html><body>Test</body></html>", "metadata": {"url": "https://example.com/1"} }, { "html": "<html><body>Test 2</body></html>", "metadata": {"url": "https://example.com/2"} } ] } MockFirecrawlApp.return_value = mock_firecrawl # Mock _process_crawlhtml_results with patch('connectors.external.firecrawl._process_crawlhtml_results') as mock_process: mock_process.return_value = 2 # Mock _upload_directory_to_s3 with patch('connectors.external.firecrawl._upload_directory_to_s3') as mock_upload: mock_upload.return_value = { "uploaded_files": 2, "failed_files": 0, "total_bytes": 1000 } # Mock asyncio.sleep to avoid actual waiting with patch('asyncio.sleep'): # Call the function result = await wait_for_job_completion( job_id="test-id", s3_uri="s3://test-bucket/", job_type="crawlhtml", poll_interval=1, timeout=10 ) # Verify results assert result["id"] == "test-id" assert result["status"] == "completed" assert result["s3_uri"] == "s3://test-bucket/test-id/" assert result["file_count"] == 2 assert result["uploaded_files"] == 2 assert result["failed_uploads"] == 0 assert result["upload_size_bytes"] == 1000 assert "elapsed_time" in result assert result["completed_urls"] == 10 assert result["total_urls"] == 10 @pytest.mark.asyncio async def test_wait_for_job_completion_llmtxt(mock_environment): """Test waiting for a job to complete - llmtxt type.""" # Mock FirecrawlApp with patch('connectors.external.firecrawl.FirecrawlApp') as MockFirecrawlApp: mock_firecrawl = MagicMock() mock_firecrawl.check_generate_llms_text_status.return_value = { "status": "completed", "data": { "llmsfulltxt": "Generated text content...", "processedUrls": ["https://example.com/1", "https://example.com/2"] } } MockFirecrawlApp.return_value = mock_firecrawl # Mock _process_llmtxt_results with patch('connectors.external.firecrawl._process_llmtxt_results') as mock_process: mock_process.return_value = 1 # Mock _upload_directory_to_s3 with patch('connectors.external.firecrawl._upload_directory_to_s3') as mock_upload: mock_upload.return_value = { "uploaded_files": 1, "failed_files": 0, "total_bytes": 500 } # Mock asyncio.sleep to avoid actual waiting with patch('asyncio.sleep'): # Call the function result = await wait_for_job_completion( job_id="test-id", s3_uri="s3://test-bucket/", job_type="llmfulltxt", poll_interval=1, timeout=10 ) # Verify results assert result["id"] == "test-id" assert result["status"] == "completed" assert result["s3_uri"] == "s3://test-bucket/test-id/" assert result["file_count"] == 1 assert result["uploaded_files"] == 1 assert result["failed_uploads"] == 0 assert result["upload_size_bytes"] == 500 assert "elapsed_time" in result assert result["processed_urls_count"] == 2 @pytest.mark.asyncio async def test_wait_for_job_completion_timeout(mock_environment): """Test timeout while waiting for a job to complete.""" # Mock FirecrawlApp with patch('connectors.external.firecrawl.FirecrawlApp') as MockFirecrawlApp: mock_firecrawl = MagicMock() # Return a non-completed status mock_firecrawl.check_crawl_status.return_value = { "status": "in_progress", "completed": 5, "total": 10 } MockFirecrawlApp.return_value = mock_firecrawl # Mock time to force timeout with patch('time.time') as mock_time: # First call is for start time, subsequent calls for checking timeout mock_time.side_effect = [0, 20, 20] # Ensure we exceed the timeout value of 10 # Mock asyncio.sleep to avoid actual waiting with patch('asyncio.sleep'): # Call the function with a short timeout result = await wait_for_job_completion( job_id="test-id", s3_uri="s3://test-bucket/", job_type="crawlhtml", poll_interval=1, timeout=10 # 10 seconds timeout ) # Verify timeout results assert "id" in result assert "status" in result assert result["status"] == "timeout" assert "error" in result assert "Timeout waiting for" in result["error"] assert "elapsed_time" in result @pytest.mark.asyncio async def test_cancel_crawlhtml_job(mock_environment): """Test cancelling a Firecrawl HTML crawl job.""" # Mock _cancel_job function with patch('connectors.external.firecrawl._cancel_job') as mock_cancel_job: mock_cancel_job.return_value = { "id": "test-id", "status": "cancelled", "message": "Firecrawl crawlhtml job cancelled successfully", "details": {"status": "cancelled"} } # Call the function result = await cancel_crawlhtml_job("test-id") # Verify results assert result["id"] == "test-id" assert result["status"] == "cancelled" assert "message" in result # Verify _cancel_job was called with the correct job type mock_cancel_job.assert_awaited_once_with("test-id", "crawlhtml") @pytest.mark.asyncio async def test_cancel_llmtxt_job(mock_environment): """Test cancelling an LLM text generation job.""" # Mock _cancel_job function with patch('connectors.external.firecrawl._cancel_job') as mock_cancel_job: mock_cancel_job.return_value = { "id": "test-id", "status": "error", "message": "Cancelling LLM text generation jobs is not supported.", "details": {"status": "error", "reason": "unsupported_operation"} } # Call the function result = await cancel_llmtxt_job("test-id") # Verify results assert result["id"] == "test-id" assert result["status"] == "error" assert "not supported" in result["message"] # Verify _cancel_job was called with the correct job type mock_cancel_job.assert_awaited_once_with("test-id", "llmfulltxt") @pytest.mark.asyncio async def test_cancel_job_failure(mock_environment): """Test handling errors when cancelling a job.""" # Mock FirecrawlApp to raise an exception with patch('connectors.external.firecrawl.FirecrawlApp') as MockFirecrawlApp: mock_firecrawl = MagicMock() mock_firecrawl.cancel_crawl.side_effect = Exception("Test exception") MockFirecrawlApp.return_value = mock_firecrawl # Call the function result = await _cancel_job("test-id", "crawlhtml") # Verify error response assert "error" in result assert "Error cancelling crawlhtml job" in result["error"] def test_process_llmtxt_results(): """Test processing LLM text generation results.""" # Create a temporary directory for output with tempfile.TemporaryDirectory() as temp_dir: # Test data result = { "data": { "llmsfulltxt": "This is the generated LLM text content for test." } } # Call the function file_count = _process_llmtxt_results(result, temp_dir) # Verify output file was created llmtxt_file = os.path.join(temp_dir, "llmfull.txt") assert os.path.exists(llmtxt_file) # Verify content of the file with open(llmtxt_file, "r") as f: content = f.read() assert content == "This is the generated LLM text content for test." # Verify file count assert file_count == 1 # Test with missing data file_count = _process_llmtxt_results({}, temp_dir) assert file_count == 0