test_jobs.py•12.6 kB
from datetime import datetime
from typing import Any, Type, Union
import pytest
from httpx import HTTPError
from mcp.server.fastmcp import Context
from pytest_mock import MockerFixture
from keboola_mcp_server.clients.client import KeboolaClient
from keboola_mcp_server.links import Link
from keboola_mcp_server.tools.jobs import (
    JobDetail,
    JobListItem,
    ListJobsOutput,
    get_job,
    list_jobs,
    run_job,
)
@pytest.fixture
def mock_jobs() -> list[dict[str, Any]]:
    """list of mock jobs - simulating the api response."""
    return [
        {
            'id': '123',
            'status': 'success',
            'component': 'keboola.ex-aws-s3',
            'config': 'config-123',
            'isFinished': True,
            'createdTime': '2024-01-01T00:00:00Z',
            'startTime': '2024-01-01T00:00:01Z',
            'endTime': '2024-01-01T00:00:02Z',
            'not_a_desired_field': 'Should not be in the result',
        },
        {
            'id': '124',
            'status': 'processing',
            'component': 'keboola.ex-aws-s3',
            'config': 'config-124',
            'isFinished': False,
            'createdTime': '2024-01-01T00:00:00Z',
            'startTime': '2024-01-01T00:00:01Z',
            'endTime': '2024-01-01T00:00:02Z',
            'not_a_desired_field': 'Should not be in the result',
        },
    ]
@pytest.fixture
def mock_job() -> dict[str, Any]:
    """mock job - simulating the api response."""
    return {
        'id': '123',
        'status': 'success',
        'component': 'keboola.ex-aws-s3',
        'config': 'config-123',
        'isFinished': True,
        'createdTime': '2024-01-01T00:00:00Z',
        'startTime': '2024-01-01T00:00:01Z',
        'endTime': '2024-01-01T00:00:02Z',
        'url': 'https://connection.keboola.com/jobs/123',
        'configData': {'source': 'file.csv'},
        'configRow': '1',
        'runId': '456',
        'durationSeconds': 100,
        'result': {'import': 'successful'},
        'metrics': {'rows': 1000},
    }
@pytest.fixture
def iso_format() -> str:
    return '%Y-%m-%dT%H:%M:%SZ'
@pytest.mark.asyncio
async def test_list_jobs(
    mocker: MockerFixture,
    mcp_context_client: Context,
    mock_jobs: list[dict[str, Any]],
    iso_format: str,
):
    """Tests retrieve_jobs tool."""
    context = mcp_context_client
    keboola_client = KeboolaClient.from_state(context.session.state)
    keboola_client.jobs_queue_client.search_jobs_by = mocker.AsyncMock(return_value=mock_jobs)
    result = await list_jobs(context)
    assert isinstance(result, ListJobsOutput)
    assert len(result.jobs) == 2
    assert all(isinstance(job, JobListItem) for job in result.jobs)
    assert all(returned.id == expected['id'] for returned, expected in zip(result.jobs, mock_jobs))
    assert all(returned.status == expected['status'] for returned, expected in zip(result.jobs, mock_jobs))
    assert all(returned.component_id == expected['component'] for returned, expected in zip(result.jobs, mock_jobs))
    assert all(returned.config_id == expected['config'] for returned, expected in zip(result.jobs, mock_jobs))
    assert all(returned.is_finished == expected['isFinished'] for returned, expected in zip(result.jobs, mock_jobs))
    assert all(
        returned.created_time is not None
        and returned.created_time.replace(tzinfo=None) == datetime.strptime(expected['createdTime'], iso_format)
        for returned, expected in zip(result.jobs, mock_jobs)
    )
    assert all(
        returned.start_time is not None
        and returned.start_time.replace(tzinfo=None) == datetime.strptime(expected['startTime'], iso_format)
        for returned, expected in zip(result.jobs, mock_jobs)
    )
    assert all(
        returned.end_time is not None
        and returned.end_time.replace(tzinfo=None) == datetime.strptime(expected['endTime'], iso_format)
        for returned, expected in zip(result.jobs, mock_jobs)
    )
    assert all(hasattr(returned, 'not_a_desired_field') is False for returned in result.jobs)
    assert len(result.links) == 1
    keboola_client.jobs_queue_client.search_jobs_by.assert_called_once_with(
        status=None,
        component_id=None,
        config_id=None,
        limit=100,
        offset=0,
        sort_by='startTime',
        sort_order='desc',
    )
@pytest.mark.asyncio
async def test_get_job(mocker: MockerFixture, mcp_context_client: Context, mock_job: dict[str, Any], iso_format: str):
    """Tests get_job_detail tool."""
    context = mcp_context_client
    keboola_client = KeboolaClient.from_state(context.session.state)
    keboola_client.jobs_queue_client.get_job_detail = mocker.AsyncMock(return_value=mock_job)
    result = await get_job(ctx=context, job_id='123')
    assert isinstance(result, JobDetail)
    assert result.id == mock_job['id']
    assert result.status == mock_job['status']
    assert result.component_id == mock_job['component']
    assert result.config_id == mock_job['config']
    assert result.is_finished == mock_job['isFinished']
    assert result.created_time is not None
    assert result.created_time.replace(tzinfo=None) == datetime.strptime(mock_job['createdTime'], iso_format)
    assert result.start_time is not None
    assert result.start_time.replace(tzinfo=None) == datetime.strptime(mock_job['startTime'], iso_format)
    assert result.end_time is not None
    assert result.end_time.replace(tzinfo=None) == datetime.strptime(mock_job['endTime'], iso_format)
    assert result.url == mock_job['url']
    assert result.config_data == mock_job['configData']
    assert result.config_row == mock_job['configRow']
    assert result.run_id == mock_job['runId']
    assert result.duration_seconds == mock_job['durationSeconds']
    assert result.result == mock_job['result']
    keboola_client.jobs_queue_client.get_job_detail.assert_called_once_with('123')
@pytest.mark.asyncio
async def test_list_jobs_with_component_and_config_id(
    mocker: MockerFixture, mcp_context_client: Context, mock_jobs: list[dict[str, Any]]
):
    """
    Tests retrieve_jobs tool with config_id and component_id. With config_id, the tool will return
    only jobs for the given config_id and component_id.
    """
    context = mcp_context_client
    keboola_client = KeboolaClient.from_state(context.session.state)
    keboola_client.jobs_queue_client.search_jobs_by = mocker.AsyncMock(return_value=mock_jobs)
    result = await list_jobs(ctx=context, component_id='keboola.ex-aws-s3', config_id='config-123')
    assert len(result.jobs) == 2
    assert all(isinstance(job, JobListItem) for job in result.jobs)
    assert all(returned.id == expected['id'] for returned, expected in zip(result.jobs, mock_jobs))
    assert all(returned.status == expected['status'] for returned, expected in zip(result.jobs, mock_jobs))
    keboola_client.jobs_queue_client.search_jobs_by.assert_called_once_with(
        status=None,
        component_id='keboola.ex-aws-s3',
        config_id='config-123',
        sort_by='startTime',
        sort_order='desc',
        limit=100,
        offset=0,
    )
@pytest.mark.asyncio
async def test_list_jobs_with_component_id_without_config_id(
    mocker: MockerFixture, mcp_context_client: Context, mock_jobs: list[dict[str, Any]]
):
    """Tests retrieve_jobs tool with component_id and without config_id.
    It will return all jobs for the given component_id."""
    context = mcp_context_client
    keboola_client = KeboolaClient.from_state(context.session.state)
    keboola_client.jobs_queue_client.search_jobs_by = mocker.AsyncMock(return_value=mock_jobs)
    result = await list_jobs(ctx=context, component_id='keboola.ex-aws-s3')
    assert len(result.jobs) == 2
    assert all(isinstance(job, JobListItem) for job in result.jobs)
    assert all(returned.id == expected['id'] for returned, expected in zip(result.jobs, mock_jobs))
    assert all(returned.status == expected['status'] for returned, expected in zip(result.jobs, mock_jobs))
    keboola_client.jobs_queue_client.search_jobs_by.assert_called_once_with(
        status=None,
        component_id='keboola.ex-aws-s3',
        config_id=None,
        limit=100,
        offset=0,
        sort_by='startTime',
        sort_order='desc',
    )
@pytest.mark.asyncio
async def test_run_job(
    mocker: MockerFixture,
    mcp_context_client: Context,
    mock_job: dict[str, Any],
):
    """Tests run_job tool.
    :param mock_job: The newly created job details - expecting api response.
    :param mcp_context_client: The MCP context client.
    """
    context = mcp_context_client
    keboola_client = KeboolaClient.from_state(context.session.state)
    mock_job['result'] = []  # simulate empty list as returned by create job endpoint
    mock_job['status'] = 'created'  # simulate created status as returned by create job endpoint
    keboola_client.jobs_queue_client.create_job = mocker.AsyncMock(return_value=mock_job)
    component_id = mock_job['component']
    configuration_id = mock_job['config']
    job_detail = await run_job(ctx=context, component_id=component_id, configuration_id=configuration_id)
    assert isinstance(job_detail, JobDetail)
    assert job_detail.result == {}
    assert job_detail.id == mock_job['id']
    assert job_detail.status == mock_job['status']
    assert job_detail.component_id == component_id
    assert job_detail.config_id == configuration_id
    assert job_detail.result == {}
    assert set(job_detail.links) == {
        Link(
            type='ui-detail', title='Job: 123', url='https://connection.test.keboola.com/admin/projects/69420/queue/123'
        ),
        Link(
            type='ui-dashboard',
            title='Jobs in the project',
            url='https://connection.test.keboola.com/admin/projects/69420/queue',
        ),
    }
    keboola_client.jobs_queue_client.create_job.assert_called_once_with(
        component_id=component_id,
        configuration_id=configuration_id,
    )
@pytest.mark.asyncio
async def test_run_job_fail(mocker: MockerFixture, mcp_context_client: Context, mock_job: dict[str, Any]):
    """Tests run_job tool when job creation fails."""
    context = mcp_context_client
    keboola_client = KeboolaClient.from_state(context.session.state)
    keboola_client.jobs_queue_client.create_job = mocker.AsyncMock(side_effect=HTTPError('Job creation failed'))
    component_id = mock_job['component']
    configuration_id = mock_job['config']
    with pytest.raises(HTTPError):
        await run_job(ctx=context, component_id=component_id, configuration_id=configuration_id)
    keboola_client.jobs_queue_client.create_job.assert_called_once_with(
        component_id=component_id,
        configuration_id=configuration_id,
    )
@pytest.mark.parametrize(
    ('field_name', 'input_value', 'expected_result'),
    [
        ('result', [], {}),  # empty list is not a valid result type but we convert it to {}, no error
        ('result', {}, {}),  # expected empty dict, no error
        ('result', {'result': []}, {'result': []}),  # expected result type, no error
        ('result', None, {}),  # None is valid and converted to {}
        (
            'result',
            ['result1', 'result2'],
            ValueError,
        ),  # list is not a valid result type, we raise an error
        ('configData', [], {}),  # empty list is not a valid config_data type but we convert it to {}, no error
        ('configData', {}, {}),  # expected empty dict, no error
        ('configData', ['configData1', 'configData2'], ValueError),  # list is not a valid config_data type,
    ],
)
def test_job_detail_model_validate_dict_fields(
    field_name: str,
    input_value: Union[list, dict, None],
    expected_result: Union[dict, Type[Exception]],
    mock_job: dict[str, Any],
):
    """Tests JobDetail model validate for result field.
    :param input_value: The input value to validate - simulating the api response.
    :param expected_result: The expected result.
    :param mock_job: The mock job details - expecting api response.
    """
    mock_job[field_name] = input_value
    mock_job['links'] = []
    if isinstance(expected_result, type) and issubclass(expected_result, Exception):
        with pytest.raises(expected_result):
            JobDetail.model_validate(mock_job)
    else:
        job_detail = JobDetail.model_validate(mock_job)
        if field_name == 'result':
            assert job_detail.result == expected_result
        elif field_name == 'configData':
            assert job_detail.config_data == expected_result