Skip to main content
Glama

Keboola Explorer MCP Server

test_jobs.py8.96 kB
import asyncio import logging import pytest from mcp.server.fastmcp import Context from integtests.conftest import ConfigDef, ProjectDef from keboola_mcp_server.clients.client import KeboolaClient from keboola_mcp_server.links import Link from keboola_mcp_server.tools.components import create_config from keboola_mcp_server.tools.jobs import JobDetail, ListJobsOutput, get_job, list_jobs, run_job LOG = logging.getLogger(__name__) async def _wait_for_job_in_list( mcp_context: Context, job_id: str, component_id: str | None, config_id: str | None, max_retries: int = 10, delay: float = 0.5, ) -> ListJobsOutput: """ Wait for a job to appear in the job list with retry mechanism. :param mcp_context: MCP context :param job_id: ID of the job to find :param component_id: Component ID to filter by (can be None) :param config_id: Config ID to filter by (can be None) :param max_retries: Maximum number of retry attempts :param delay: Delay between retries in seconds :return: ListJobsOutput containing the job :raises AssertionError: If job is not found after all retries """ for attempt in range(max_retries): result = await list_jobs( ctx=mcp_context, component_id=component_id, config_id=config_id, limit=10, sort_by='startTime', sort_order='desc', ) job_ids = {job.id for job in result.jobs} if job_id in job_ids: LOG.info(f'Job {job_id} found in list after {attempt + 1} attempts') return result if attempt < max_retries - 1: LOG.info(f'Job {job_id} not found in list, attempt {attempt + 1}/{max_retries}, retrying in {delay}s...') await asyncio.sleep(delay) raise AssertionError(f'Job {job_id} not found in job list after {max_retries} attempts') @pytest.mark.asyncio async def test_list_jobs_with_component_and_config_filter(mcp_context: Context, configs: list[ConfigDef]): """Tests that `list_jobs` works with component and config filtering.""" # Use first config to create jobs for testing test_config = configs[0] component_id = test_config.component_id configuration_id = test_config.configuration_id job = await run_job(ctx=mcp_context, component_id=component_id, configuration_id=configuration_id) # Wait for the job to appear in the list (handles race condition) result = await _wait_for_job_in_list( mcp_context=mcp_context, job_id=job.id, component_id=component_id, config_id=configuration_id, ) assert isinstance(result, ListJobsOutput) assert len(result.jobs) >= 1 # Verify our created jobs appear in the results job_ids = {job.id for job in result.jobs} assert job.id in job_ids for job in result.jobs: assert job.component_id == component_id assert job.config_id == configuration_id @pytest.mark.asyncio async def test_run_job_and_get_job(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef): """Tests that `run_job` creates a job and `get_job` retrieves its details.""" project_id = keboola_project.project_id test_config = configs[0] component_id = test_config.component_id configuration_id = test_config.configuration_id started_job = await run_job(ctx=mcp_context, component_id=component_id, configuration_id=configuration_id) # Verify the started job response assert isinstance(started_job, JobDetail) assert started_job.id is not None assert started_job.component_id == component_id assert started_job.config_id == configuration_id assert started_job.status is not None assert frozenset(started_job.links) == frozenset( [ Link( type='ui-detail', title=f'Job: {started_job.id}', url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{started_job.id}', ), Link( type='ui-dashboard', title='Jobs in the project', url=f'https://connection.keboola.com/admin/projects/{project_id}/queue', ), ] ) job_detail = await get_job(job_id=started_job.id, ctx=mcp_context) # Verify the job detail response assert isinstance(job_detail, JobDetail) assert job_detail.id == started_job.id assert job_detail.component_id == component_id assert job_detail.config_id == configuration_id assert job_detail.status is not None assert job_detail.url is not None assert frozenset(job_detail.links) == frozenset( [ Link( type='ui-detail', title=f'Job: {job_detail.id}', url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{job_detail.id}', ), Link( type='ui-dashboard', title='Jobs in the project', url=f'https://connection.keboola.com/admin/projects/{project_id}/queue', ), ] ) @pytest.mark.asyncio async def test_get_job(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef): """Tests `get_job` by creating a job and then retrieving its details.""" project_id = keboola_project.project_id # Use first config to create a specific job test_config = configs[0] component_id = test_config.component_id configuration_id = test_config.configuration_id # Create a specific job to test get_job with created_job = await run_job(ctx=mcp_context, component_id=component_id, configuration_id=configuration_id) # Now test get_job on the job we just created job_detail = await get_job(job_id=created_job.id, ctx=mcp_context) # Verify all expected fields are present assert isinstance(job_detail, JobDetail) assert job_detail.id == created_job.id assert job_detail.component_id == component_id assert job_detail.config_id == configuration_id assert job_detail.status is not None assert frozenset(job_detail.links) == frozenset( [ Link( type='ui-detail', title=f'Job: {created_job.id}', url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{created_job.id}', ), Link( type='ui-dashboard', title='Jobs in the project', url=f'https://connection.keboola.com/admin/projects/{project_id}/queue', ), ] ) @pytest.mark.asyncio async def test_run_job_with_newly_created_config( mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef ): """Tests that `run_job` works with a newly created configuration.""" project_id = keboola_project.project_id test_config = configs[0] component_id = test_config.component_id # Create a new configuration for testing new_config = await create_config( ctx=mcp_context, name='Test Config for Job Run', description='Test configuration created for job run test', component_id=component_id, parameters={}, storage={}, ) try: # Run a job on the new configuration started_job = await run_job( ctx=mcp_context, component_id=component_id, configuration_id=new_config.configuration_id ) # Verify the job was started successfully assert isinstance(started_job, JobDetail) assert started_job.id is not None assert started_job.component_id == component_id assert started_job.config_id == new_config.configuration_id assert started_job.status is not None assert frozenset(started_job.links) == frozenset( [ Link( type='ui-detail', title=f'Job: {started_job.id}', url=f'https://connection.keboola.com/admin/projects/{project_id}/queue/{started_job.id}', ), Link( type='ui-dashboard', title='Jobs in the project', url=f'https://connection.keboola.com/admin/projects/{project_id}/queue', ), ] ) # Verify job can be retrieved job_detail = await get_job(job_id=started_job.id, ctx=mcp_context) assert isinstance(job_detail, JobDetail) assert job_detail.id == started_job.id assert job_detail.component_id == component_id assert job_detail.config_id == new_config.configuration_id finally: # Clean up: Delete the configuration client = KeboolaClient.from_state(mcp_context.session.state) await client.storage_client.configuration_delete( component_id=component_id, configuration_id=new_config.configuration_id, skip_trash=True )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server