test_jobs.py•8.96 kB
import asyncio
import logging
import pytest
from mcp.server.fastmcp import Context
from integtests.conftest import ConfigDef, ProjectDef
from keboola_mcp_server.clients.client import KeboolaClient
from keboola_mcp_server.links import Link
from keboola_mcp_server.tools.components import create_config
from keboola_mcp_server.tools.jobs import JobDetail, ListJobsOutput, get_job, list_jobs, run_job
LOG = logging.getLogger(__name__)
async def _wait_for_job_in_list(
    mcp_context: Context,
    job_id: str,
    component_id: str | None,
    config_id: str | None,
    max_retries: int = 10,
    delay: float = 0.5,
) -> ListJobsOutput:
    """
    Wait for a job to appear in the job list with retry mechanism.
    :param mcp_context: MCP context
    :param job_id: ID of the job to find
    :param component_id: Component ID to filter by (can be None)
    :param config_id: Config ID to filter by (can be None)
    :param max_retries: Maximum number of retry attempts
    :param delay: Delay between retries in seconds
    :return: ListJobsOutput containing the job
    :raises AssertionError: If job is not found after all retries
    """
    for attempt in range(max_retries):
        result = await list_jobs(
            ctx=mcp_context,
            component_id=component_id,
            config_id=config_id,
            limit=10,
            sort_by='startTime',
            sort_order='desc',
        )
        job_ids = {job.id for job in result.jobs}
        if job_id in job_ids:
            LOG.info(f'Job {job_id} found in list after {attempt + 1} attempts')
            return result
        if attempt < max_retries - 1:
            LOG.info(f'Job {job_id} not found in list, attempt {attempt + 1}/{max_retries}, retrying in {delay}s...')
            await asyncio.sleep(delay)
    raise AssertionError(f'Job {job_id} not found in job list after {max_retries} attempts')
@pytest.mark.asyncio
async def test_list_jobs_with_component_and_config_filter(mcp_context: Context, configs: list[ConfigDef]):
    """Tests that `list_jobs` works with component and config filtering."""
    # Use first config to create jobs for testing
    test_config = configs[0]
    component_id = test_config.component_id
    configuration_id = test_config.configuration_id
    job = await run_job(ctx=mcp_context, component_id=component_id, configuration_id=configuration_id)
    # Wait for the job to appear in the list (handles race condition)
    result = await _wait_for_job_in_list(
        mcp_context=mcp_context,
        job_id=job.id,
        component_id=component_id,
        config_id=configuration_id,
    )
    assert isinstance(result, ListJobsOutput)
    assert len(result.jobs) >= 1
    # Verify our created jobs appear in the results
    job_ids = {job.id for job in result.jobs}
    assert job.id in job_ids
    for job in result.jobs:
        assert job.component_id == component_id
        assert job.config_id == configuration_id
@pytest.mark.asyncio
async def test_run_job_and_get_job(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef):
    """Tests that `run_job` creates a job and `get_job` retrieves its details."""
    project_id = keboola_project.project_id
    test_config = configs[0]
    component_id = test_config.component_id
    configuration_id = test_config.configuration_id
    started_job = await run_job(ctx=mcp_context, component_id=component_id, configuration_id=configuration_id)
    # Verify the started job response
    assert isinstance(started_job, JobDetail)
    assert started_job.id is not None
    assert started_job.component_id == component_id
    assert started_job.config_id == configuration_id
    assert started_job.status is not None
    assert frozenset(started_job.links) == frozenset(
        [
            Link(
                type='ui-detail',
                title=f'Job: {started_job.id}',
                url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{started_job.id}',
            ),
            Link(
                type='ui-dashboard',
                title='Jobs in the project',
                url=f'https://connection.keboola.com/admin/projects/{project_id}/queue',
            ),
        ]
    )
    job_detail = await get_job(job_id=started_job.id, ctx=mcp_context)
    # Verify the job detail response
    assert isinstance(job_detail, JobDetail)
    assert job_detail.id == started_job.id
    assert job_detail.component_id == component_id
    assert job_detail.config_id == configuration_id
    assert job_detail.status is not None
    assert job_detail.url is not None
    assert frozenset(job_detail.links) == frozenset(
        [
            Link(
                type='ui-detail',
                title=f'Job: {job_detail.id}',
                url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{job_detail.id}',
            ),
            Link(
                type='ui-dashboard',
                title='Jobs in the project',
                url=f'https://connection.keboola.com/admin/projects/{project_id}/queue',
            ),
        ]
    )
@pytest.mark.asyncio
async def test_get_job(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef):
    """Tests `get_job` by creating a job and then retrieving its details."""
    project_id = keboola_project.project_id
    # Use first config to create a specific job
    test_config = configs[0]
    component_id = test_config.component_id
    configuration_id = test_config.configuration_id
    # Create a specific job to test get_job with
    created_job = await run_job(ctx=mcp_context, component_id=component_id, configuration_id=configuration_id)
    # Now test get_job on the job we just created
    job_detail = await get_job(job_id=created_job.id, ctx=mcp_context)
    # Verify all expected fields are present
    assert isinstance(job_detail, JobDetail)
    assert job_detail.id == created_job.id
    assert job_detail.component_id == component_id
    assert job_detail.config_id == configuration_id
    assert job_detail.status is not None
    assert frozenset(job_detail.links) == frozenset(
        [
            Link(
                type='ui-detail',
                title=f'Job: {created_job.id}',
                url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{created_job.id}',
            ),
            Link(
                type='ui-dashboard',
                title='Jobs in the project',
                url=f'https://connection.keboola.com/admin/projects/{project_id}/queue',
            ),
        ]
    )
@pytest.mark.asyncio
async def test_run_job_with_newly_created_config(
    mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef
):
    """Tests that `run_job` works with a newly created configuration."""
    project_id = keboola_project.project_id
    test_config = configs[0]
    component_id = test_config.component_id
    # Create a new configuration for testing
    new_config = await create_config(
        ctx=mcp_context,
        name='Test Config for Job Run',
        description='Test configuration created for job run test',
        component_id=component_id,
        parameters={},
        storage={},
    )
    try:
        # Run a job on the new configuration
        started_job = await run_job(
            ctx=mcp_context, component_id=component_id, configuration_id=new_config.configuration_id
        )
        # Verify the job was started successfully
        assert isinstance(started_job, JobDetail)
        assert started_job.id is not None
        assert started_job.component_id == component_id
        assert started_job.config_id == new_config.configuration_id
        assert started_job.status is not None
        assert frozenset(started_job.links) == frozenset(
            [
                Link(
                    type='ui-detail',
                    title=f'Job: {started_job.id}',
                    url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{started_job.id}',
                ),
                Link(
                    type='ui-dashboard',
                    title='Jobs in the project',
                    url=f'https://connection.keboola.com/admin/projects/{project_id}/queue',
                ),
            ]
        )
        # Verify job can be retrieved
        job_detail = await get_job(job_id=started_job.id, ctx=mcp_context)
        assert isinstance(job_detail, JobDetail)
        assert job_detail.id == started_job.id
        assert job_detail.component_id == component_id
        assert job_detail.config_id == new_config.configuration_id
    finally:
        # Clean up: Delete the configuration
        client = KeboolaClient.from_state(mcp_context.session.state)
        await client.storage_client.configuration_delete(
            component_id=component_id, configuration_id=new_config.configuration_id, skip_trash=True
        )